Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.schema.fetch_data
import json
from enum import Enum
from typing import (
List,
Optional,
Union,
)
from pydantic import (
ConfigDict,
Field,
field_validator,
Json,
)
from typing_extensions import (
Annotated,
Literal,
)
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.schema import Model
from galaxy.schema.types import CoercedStringType
[docs]class ElementsFromType(str, Enum):
archive = "archive"
bagit = "bagit"
bagit_archive = "bagit_archive"
directory = "directory"
AutoDecompressField = Field(False, description="Decompress compressed data before sniffing?")
[docs]class ItemsFromSrc(str, Enum):
url = "url"
files = "files"
path = "path"
ftp_import = "ftp_import"
server_dir = "server_dir"
[docs]class Src(str, Enum):
url = "url"
pasted = "pasted"
files = "files"
path = "path"
composite = "composite"
ftp_import = "ftp_import"
server_dir = "server_dir"
[docs]class DestinationType(str, Enum):
library = "library"
library_folder = "library_folder"
hdcas = "hdcas"
hdas = "hdas"
[docs]class LibraryFolderDestination(FetchBaseModel):
type: Literal["library_folder"]
library_folder_id: DecodedDatabaseIdField # For some reason this folder ID must NOT have the 'F' prefix
[docs]class BaseCollectionTarget(BaseFetchDataTarget):
destination: HdcaDestination
collection_type: Optional[str] = None
tags: Optional[List[str]] = None
name: Optional[str] = None
[docs]class LibraryDestination(FetchBaseModel):
type: Literal["library"]
name: str = Field(..., description="Must specify a library name")
description: Optional[str] = Field(None, description="Description for library to create")
synopsis: Optional[str] = Field(None, description="Description for library to create")
[docs]class ExtraFiles(FetchBaseModel):
items_from: Optional[str] = None
src: Src
fuzzy_root: Optional[bool] = Field(
True,
description="Prevent Galaxy from checking for a single file in a directory and re-interpreting the archive",
)
[docs]class FetchDatasetHash(Model):
hash_function: Literal["MD5", "SHA-1", "SHA-256", "SHA-512"]
hash_value: str
model_config = ConfigDict(extra="forbid")
[docs]class BaseDataElement(FetchBaseModel):
name: Optional[CoercedStringType] = None
dbkey: str = Field("?")
info: Optional[str] = None
ext: str = Field("auto")
space_to_tab: bool = False
to_posix_lines: bool = False
deferred: bool = False
tags: Optional[List[str]] = None
created_from_basename: Optional[str] = None
extra_files: Optional[ExtraFiles] = None
auto_decompress: bool = AutoDecompressField
items_from: Optional[ElementsFromType] = Field(None, alias="elements_from")
collection_type: Optional[str] = None
MD5: Optional[str] = None
SHA1: Optional[str] = Field(None, alias="SHA-1")
SHA256: Optional[str] = Field(None, alias="SHA-256")
SHA512: Optional[str] = Field(None, alias="SHA-512")
hashes: Optional[List[FetchDatasetHash]] = None
description: Optional[str] = None
model_config = ConfigDict(extra="forbid")
[docs]class PastedDataElement(BaseDataElement):
src: Literal["pasted"]
paste_content: CoercedStringType = Field(..., description="Content to upload")
[docs]class UrlDataElement(BaseDataElement):
src: Literal["url"]
url: str = Field(..., description="URL to upload")
[docs]class ServerDirElement(BaseDataElement):
src: Literal["server_dir"]
server_dir: str
link_data_only: Optional[bool] = None
[docs]class FtpImportElement(BaseDataElement):
src: Literal["ftp_import"]
ftp_path: str
collection_type: Optional[str] = None
[docs]class ItemsFromModel(Model):
src: ItemsFromSrc
path: Optional[str] = None
ftp_path: Optional[str] = None
server_dir: Optional[str] = None
url: Optional[str] = None
[docs]class FtpImportTarget(BaseCollectionTarget):
src: Literal["ftp_import"]
ftp_path: str
items_from: Optional[ElementsFromType] = Field(None, alias="elements_from")
[docs]class PathDataElement(BaseDataElement):
src: Literal["path"]
path: str
items_from: Optional[ElementsFromType] = Field(None, alias="elements_from")
link_data_only: Optional[bool] = None
[docs]class CompositeDataElement(BaseDataElement):
src: Literal["composite"]
composite: "CompositeItems"
[docs]class CompositeItems(FetchBaseModel):
items: List[
Union[FileDataElement, PastedDataElement, UrlDataElement, PathDataElement, ServerDirElement, FtpImportElement]
] = Field(..., alias="elements")
CompositeDataElement.model_rebuild()
[docs]class NestedElement(BaseDataElement):
items: List[Union["AnyElement", "NestedElement"]] = Field(..., alias="elements")
AnyElement = Annotated[
Union[
FileDataElement,
PastedDataElement,
UrlDataElement,
PathDataElement,
ServerDirElement,
FtpImportElement,
CompositeDataElement,
],
Field(default_factory=None, discriminator="src"),
]
# Seems to be a bug in pydantic ... can't reuse AnyElement in more than one model
AnyElement2 = Annotated[
Union[
FileDataElement,
PastedDataElement,
UrlDataElement,
PathDataElement,
ServerDirElement,
FtpImportElement,
CompositeDataElement,
],
Field(default_factory=None, discriminator="src"),
]
NestedElement.model_rebuild()
[docs]class BaseDataTarget(BaseFetchDataTarget):
destination: Union[HdaDestination, LibraryFolderDestination, LibraryDestination] = Field(..., discriminator="type")
[docs]class DataElementsTarget(BaseDataTarget):
items: List[Union[AnyElement, NestedElement]] = Field(..., alias="elements")
[docs]class DataElementsFromTarget(BaseDataTarget, ItemsFromModel):
items_from: ElementsFromType = Field(..., alias="elements_from")
[docs]class HdcaDataItemsTarget(BaseCollectionTarget):
items: List[Union[AnyElement2, NestedElement]] = Field(..., alias="elements")
[docs]class HdcaDataItemsFromTarget(BaseCollectionTarget, ItemsFromModel):
items_from: ElementsFromType = Field(..., alias="elements_from")
[docs]class BaseDataPayload(FetchBaseModel):
history_id: DecodedDatabaseIdField
model_config = ConfigDict(extra="allow")
[docs] @field_validator("targets", mode="before", check_fields=False)
@classmethod
def targets_string_to_json(cls, v):
if isinstance(v, str):
return json.loads(v)
return v
Targets = List[
Union[
DataElementsTarget,
HdcaDataItemsTarget,
DataElementsFromTarget,
HdcaDataItemsFromTarget,
FtpImportTarget,
]
]