Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for cwltool.utils
"""Shared functions and other definitions."""
import collections
try:
import fcntl
except ImportError:
# Guard against `from .utils import ...` on windows.
# See windows_check() in main.py
pass
import importlib.metadata
import os
import random
import shutil
import stat
import string
import subprocess # nosec
import sys
import tempfile
import urllib
import uuid
from datetime import datetime
from email.utils import parsedate_to_datetime
from functools import partial
from itertools import zip_longest
from pathlib import Path, PurePosixPath
from tempfile import NamedTemporaryFile
from typing import (
IO,
TYPE_CHECKING,
Any,
Callable,
Deque,
Dict,
Generator,
Iterable,
List,
Literal,
MutableMapping,
MutableSequence,
NamedTuple,
Optional,
Set,
Tuple,
TypedDict,
Union,
cast,
)
import requests
from cachecontrol import CacheControl
from cachecontrol.caches import FileCache
from mypy_extensions import mypyc_attr
from schema_salad.exceptions import ValidationException
from schema_salad.ref_resolver import Loader
if sys.version_info >= (3, 9):
from importlib.resources import as_file, files
else:
from importlib_resources import as_file, files
if TYPE_CHECKING:
from .command_line_tool import CallbackJob, ExpressionJob
from .job import CommandLineJob, JobBase
from .stdfsaccess import StdFsAccess
from .workflow_job import WorkflowJob
__all__ = ["files", "as_file"]
__random_outdir: Optional[str] = None
CONTENT_LIMIT = 64 * 1024
DEFAULT_TMP_PREFIX = tempfile.gettempdir() + os.path.sep
processes_to_kill: Deque["subprocess.Popen[str]"] = collections.deque()
CWLOutputType = Union[
None,
bool,
str,
int,
float,
MutableSequence["CWLOutputType"],
MutableMapping[str, "CWLOutputType"],
]
CWLObjectType = MutableMapping[str, Optional[CWLOutputType]]
"""Typical raw dictionary found in lightly parsed CWL."""
JobsType = Union["CommandLineJob", "JobBase", "WorkflowJob", "ExpressionJob", "CallbackJob"]
JobsGeneratorType = Generator[Optional[JobsType], None, None]
OutputCallbackType = Callable[[Optional[CWLObjectType], str], None]
ResolverType = Callable[["Loader", str], Optional[str]]
DestinationsType = MutableMapping[str, Optional[CWLOutputType]]
ScatterDestinationsType = MutableMapping[str, List[Optional[CWLOutputType]]]
ScatterOutputCallbackType = Callable[[Optional[ScatterDestinationsType], str], None]
SinkType = Union[CWLOutputType, CWLObjectType]
DirectoryType = TypedDict(
"DirectoryType", {"class": str, "listing": List[CWLObjectType], "basename": str}
)
JSONType = Union[Dict[str, "JSONType"], List["JSONType"], str, int, float, bool, None]
class WorkflowStateItem(NamedTuple):
"""Workflow state item."""
parameter: CWLObjectType
value: Optional[CWLOutputType]
success: str
ParametersType = List[CWLObjectType]
StepType = CWLObjectType # WorkflowStep
LoadListingType = Union[Literal["no_listing"], Literal["shallow_listing"], Literal["deep_listing"]]
def versionstring() -> str:
"""Version of CWLtool used to execute the workflow."""
if pkg := importlib.metadata.version("cwltool"):
return f"{sys.argv[0]} {pkg}"
return "{} {}".format(sys.argv[0], "unknown version")
def aslist(thing: Any) -> MutableSequence[Any]:
"""Wrap any non-MutableSequence/list in a list."""
if isinstance(thing, MutableSequence):
return thing
return [thing]
def copytree_with_merge(src: str, dst: str) -> None:
if not os.path.exists(dst):
os.makedirs(dst)
shutil.copystat(src, dst)
lst = os.listdir(src)
for item in lst:
spath = os.path.join(src, item)
dpath = os.path.join(dst, item)
if os.path.isdir(spath):
copytree_with_merge(spath, dpath)
else:
shutil.copy2(spath, dpath)
def cmp_like_py2(dict1: Dict[str, Any], dict2: Dict[str, Any]) -> int:
"""
Compare in the same manner as Python2.
Comparison function to be used in sorting as python3 doesn't allow sorting
of different types like str() and int().
This function re-creates sorting nature in py2 of heterogeneous list of
`int` and `str`
"""
# extract lists from both dicts
first, second = dict1["position"], dict2["position"]
# iterate through both list till max of their size
for i, j in zip_longest(first, second):
if i == j:
continue
# in case 1st list is smaller
# should come first in sorting
if i is None:
return -1
# if 1st list is longer,
# it should come later in sort
elif j is None:
return 1
# if either of the list contains str element
# at any index, both should be str before comparing
if isinstance(i, str) or isinstance(j, str):
return 1 if str(i) > str(j) else -1
# int comparison otherwise
return 1 if i > j else -1
# if both lists are equal
return 0
def bytes2str_in_dicts(
inp: Union[MutableMapping[str, Any], MutableSequence[Any], Any],
) -> Union[str, MutableSequence[Any], MutableMapping[str, Any]]:
"""
Convert any present byte string to unicode string, inplace.
input is a dict of nested dicts and lists
"""
# if input is dict, recursively call for each value
if isinstance(inp, MutableMapping):
for k in inp:
inp[k] = bytes2str_in_dicts(inp[k])
return inp
# if list, iterate through list and fn call
# for all its elements
if isinstance(inp, MutableSequence):
for idx, value in enumerate(inp):
inp[idx] = bytes2str_in_dicts(value)
return inp
# if value is bytes, return decoded string,
elif isinstance(inp, bytes):
return inp.decode("utf-8")
# simply return elements itself
return inp
[docs]def visit_class(rec: Any, cls: Iterable[Any], op: Callable[..., Any]) -> None:
"""Apply a function to with "class" in cls."""
if isinstance(rec, MutableMapping):
if "class" in rec and rec.get("class") in cls:
op(rec)
for d in rec:
visit_class(rec[d], cls, op)
if isinstance(rec, MutableSequence):
for d in rec:
visit_class(d, cls, op)
def visit_field(rec: Any, field: str, op: Callable[..., Any]) -> None:
"""Apply a function to mapping with 'field'."""
if isinstance(rec, MutableMapping):
if field in rec:
rec[field] = op(rec[field])
for d in rec:
visit_field(rec[d], field, op)
if isinstance(rec, MutableSequence):
for d in rec:
visit_field(d, field, op)
def random_outdir() -> str:
"""Return the random directory name chosen to use for tool / workflow output."""
global __random_outdir
if not __random_outdir:
__random_outdir = "/" + "".join(
[random.choice(string.ascii_letters) for _ in range(6)] # nosec
)
return __random_outdir
return __random_outdir
def shared_file_lock(fd: IO[Any]) -> None:
fcntl.flock(fd.fileno(), fcntl.LOCK_SH)
def upgrade_lock(fd: IO[Any]) -> None:
fcntl.flock(fd.fileno(), fcntl.LOCK_EX)
def adjustFileObjs(rec: Any, op: Union[Callable[..., Any], "partial[Any]"]) -> None:
"""Apply an update function to each File object in the object `rec`."""
visit_class(rec, ("File",), op)
def adjustDirObjs(rec: Any, op: Union[Callable[..., Any], "partial[Any]"]) -> None:
"""Apply an update function to each Directory object in the object `rec`."""
visit_class(rec, ("Directory",), op)
def dedup(listing: List[CWLObjectType]) -> List[CWLObjectType]:
marksub = set()
def mark(d: Dict[str, str]) -> None:
marksub.add(d["location"])
for entry in listing:
if entry["class"] == "Directory":
for e in cast(List[CWLObjectType], entry.get("listing", [])):
adjustFileObjs(e, mark)
adjustDirObjs(e, mark)
dd = []
markdup: Set[str] = set()
for r in listing:
if r["location"] not in marksub and r["location"] not in markdup:
dd.append(r)
markdup.add(cast(str, r["location"]))
return dd
def get_listing(fs_access: "StdFsAccess", rec: CWLObjectType, recursive: bool = True) -> None:
"""Expand, recursively, any 'listing' fields in a Directory."""
if rec.get("class") != "Directory":
finddirs: List[CWLObjectType] = []
visit_class(rec, ("Directory",), finddirs.append)
for f in finddirs:
get_listing(fs_access, f, recursive=recursive)
return
if "listing" in rec:
return
listing: List[CWLOutputType] = []
loc = cast(str, rec["location"])
for ld in fs_access.listdir(loc):
parse = urllib.parse.urlparse(ld)
bn = os.path.basename(urllib.request.url2pathname(parse.path))
if fs_access.isdir(ld):
ent: MutableMapping[str, Any] = {
"class": "Directory",
"location": ld,
"basename": bn,
}
if recursive:
get_listing(fs_access, ent, recursive)
listing.append(ent)
else:
listing.append({"class": "File", "location": ld, "basename": bn})
rec["listing"] = listing
def trim_listing(obj: Dict[str, Any]) -> None:
"""
Remove 'listing' field from Directory objects that are file references.
It redundant and potentially expensive to pass fully enumerated Directory
objects around if not explicitly needed, so delete the 'listing' field when
it is safe to do so.
"""
if obj.get("location", "").startswith("file://") and "listing" in obj:
del obj["listing"]
def downloadHttpFile(httpurl: str) -> Tuple[str, Optional[datetime]]:
"""
Download a remote file, possibly using a locally cached copy.
Returns a tuple:
- the local path for the downloaded file
- the Last-Modified timestamp if received from the remote server.
"""
cache_session = None
if "XDG_CACHE_HOME" in os.environ:
directory = os.environ["XDG_CACHE_HOME"]
elif "HOME" in os.environ:
directory = os.environ["HOME"]
else:
directory = os.path.expanduser("~")
cache_session = CacheControl(
requests.Session(),
cache=FileCache(os.path.join(directory, ".cache", "cwltool")),
)
r = cache_session.get(httpurl, stream=True)
with NamedTemporaryFile(mode="wb", delete=False) as f:
for chunk in r.iter_content(chunk_size=16384):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
r.close()
date_raw: Optional[str] = r.headers.get("Last-Modified", None)
date: Optional[datetime] = parsedate_to_datetime(date_raw) if date_raw else None
if date:
date_epoch = date.timestamp()
os.utime(f.name, (date_epoch, date_epoch))
return str(f.name), date
def ensure_writable(path: str, include_root: bool = False) -> None:
"""
Ensure that 'path' is writable.
If 'path' is a directory, then all files and directories under 'path' are
made writable, recursively. If 'path' is a file or if 'include_root' is
`True`, then 'path' itself is made writable.
"""
def add_writable_flag(p: str) -> None:
st = os.stat(p)
mode = stat.S_IMODE(st.st_mode)
os.chmod(p, mode | stat.S_IWUSR)
if os.path.isdir(path):
if include_root:
add_writable_flag(path)
for root, dirs, files_ in os.walk(path):
for name in files_:
add_writable_flag(os.path.join(root, name))
for name in dirs:
add_writable_flag(os.path.join(root, name))
else:
add_writable_flag(path)
def ensure_non_writable(path: str) -> None:
"""Attempt to change permissions to ensure that a path is not writable."""
if os.path.isdir(path):
for root, dirs, files_ in os.walk(path):
for name in files_:
j = os.path.join(root, name)
st = os.stat(j)
mode = stat.S_IMODE(st.st_mode)
os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
for name in dirs:
j = os.path.join(root, name)
st = os.stat(j)
mode = stat.S_IMODE(st.st_mode)
os.chmod(j, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
else:
st = os.stat(path)
mode = stat.S_IMODE(st.st_mode)
os.chmod(path, mode & ~stat.S_IWUSR & ~stat.S_IWGRP & ~stat.S_IWOTH)
[docs]def normalizeFilesDirs(
job: Optional[
Union[
MutableSequence[MutableMapping[str, Any]],
MutableMapping[str, Any],
DirectoryType,
]
]
) -> None:
def addLocation(d: Dict[str, Any]) -> None:
if "location" not in d:
if d["class"] == "File" and ("contents" not in d):
raise ValidationException(
"Anonymous file object must have 'contents' and 'basename' fields."
)
if d["class"] == "Directory" and ("listing" not in d or "basename" not in d):
raise ValidationException(
"Anonymous directory object must have 'listing' and 'basename' fields."
)
d["location"] = "_:" + str(uuid.uuid4())
if "basename" not in d:
d["basename"] = d["location"][2:]
parse = urllib.parse.urlparse(d["location"])
path = parse.path
# strip trailing slash
if path.endswith("/"):
if d["class"] != "Directory":
raise ValidationException(
"location '%s' ends with '/' but is not a Directory" % d["location"]
)
path = path.rstrip("/")
d["location"] = urllib.parse.urlunparse(
(
parse.scheme,
parse.netloc,
path,
parse.params,
parse.query,
parse.fragment,
)
)
if not d.get("basename"):
if path.startswith("_:"):
d["basename"] = str(path[2:])
else:
d["basename"] = str(os.path.basename(urllib.request.url2pathname(path)))
if d["class"] == "File":
nr, ne = os.path.splitext(d["basename"])
if d.get("nameroot") != nr:
d["nameroot"] = str(nr)
if d.get("nameext") != ne:
d["nameext"] = str(ne)
visit_class(job, ("File", "Directory"), addLocation)
def posix_path(local_path: str) -> str:
return str(PurePosixPath(Path(local_path)))
def local_path(posix_path: str) -> str:
return str(Path(posix_path))
def create_tmp_dir(tmpdir_prefix: str) -> str:
"""Create a temporary directory that respects the given tmpdir_prefix."""
tmp_dir, tmp_prefix = os.path.split(tmpdir_prefix)
return tempfile.mkdtemp(prefix=tmp_prefix, dir=tmp_dir)
@mypyc_attr(allow_interpreted_subclasses=True)
class HasReqsHints:
"""Base class for get_requirement()."""
def __init__(self) -> None:
"""Initialize this reqs decorator."""
self.requirements: List[CWLObjectType] = []
self.hints: List[CWLObjectType] = []
def get_requirement(self, feature: str) -> Tuple[Optional[CWLObjectType], Optional[bool]]:
"""Retrieve the named feature from the requirements field, or the hints field."""
for item in reversed(self.requirements):
if item["class"] == feature:
return (item, True)
for item in reversed(self.hints):
if item["class"] == feature:
return (item, False)
return (None, None)