Source code for galaxy.schema.fetch_data

import json
from enum import Enum
from typing import (
    List,
    Optional,
    Union,
)

from pydantic import (
    ConfigDict,
    Field,
    field_validator,
    Json,
)
from typing_extensions import (
    Annotated,
    Literal,
)

from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.schema import Model
from galaxy.schema.types import CoercedStringType


[docs]class FetchBaseModel(Model): model_config = ConfigDict(populate_by_name=True)
[docs]class ElementsFromType(str, Enum): archive = "archive" bagit = "bagit" bagit_archive = "bagit_archive" directory = "directory"
AutoDecompressField = Field(False, description="Decompress compressed data before sniffing?")
[docs]class BaseFetchDataTarget(FetchBaseModel): auto_decompress: bool = AutoDecompressField
[docs]class ItemsFromSrc(str, Enum): url = "url" files = "files" path = "path" ftp_import = "ftp_import" server_dir = "server_dir"
[docs]class Src(str, Enum): url = "url" pasted = "pasted" files = "files" path = "path" composite = "composite" ftp_import = "ftp_import" server_dir = "server_dir"
[docs]class DestinationType(str, Enum): library = "library" library_folder = "library_folder" hdcas = "hdcas" hdas = "hdas"
[docs]class HdaDestination(FetchBaseModel): type: Literal["hdas"]
[docs]class HdcaDestination(FetchBaseModel): type: Literal["hdca"]
[docs]class LibraryFolderDestination(FetchBaseModel): type: Literal["library_folder"] library_folder_id: DecodedDatabaseIdField # For some reason this folder ID must NOT have the 'F' prefix
[docs]class BaseCollectionTarget(BaseFetchDataTarget): destination: HdcaDestination collection_type: Optional[str] = None tags: Optional[List[str]] = None name: Optional[str] = None
[docs]class LibraryDestination(FetchBaseModel): type: Literal["library"] name: str = Field(..., description="Must specify a library name") description: Optional[str] = Field(None, description="Description for library to create") synopsis: Optional[str] = Field(None, description="Description for library to create")
[docs]class ExtraFiles(FetchBaseModel): items_from: Optional[str] = None src: Src fuzzy_root: Optional[bool] = Field( True, description="Prevent Galaxy from checking for a single file in a directory and re-interpreting the archive", )
[docs]class BaseDataElement(FetchBaseModel): name: Optional[CoercedStringType] = None dbkey: str = Field("?") info: Optional[str] = None ext: str = Field("auto") space_to_tab: bool = False to_posix_lines: bool = False deferred: bool = False tags: Optional[List[str]] = None created_from_basename: Optional[str] = None extra_files: Optional[ExtraFiles] = None auto_decompress: bool = AutoDecompressField items_from: Optional[ElementsFromType] = Field(None, alias="elements_from") collection_type: Optional[str] = None MD5: Optional[str] = None description: Optional[str] = None model_config = ConfigDict(extra="forbid")
[docs]class FileDataElement(BaseDataElement): src: Literal["files"]
[docs]class PastedDataElement(BaseDataElement): src: Literal["pasted"] paste_content: CoercedStringType = Field(..., description="Content to upload")
[docs]class UrlDataElement(BaseDataElement): src: Literal["url"] url: str = Field(..., description="URL to upload")
[docs]class ServerDirElement(BaseDataElement): src: Literal["server_dir"] server_dir: str link_data_only: Optional[bool] = None
[docs]class FtpImportElement(BaseDataElement): src: Literal["ftp_import"] ftp_path: str collection_type: Optional[str] = None
[docs]class ItemsFromModel(Model): src: ItemsFromSrc path: Optional[str] = None ftp_path: Optional[str] = None server_dir: Optional[str] = None url: Optional[str] = None
[docs]class FtpImportTarget(BaseCollectionTarget): src: Literal["ftp_import"] ftp_path: str items_from: Optional[ElementsFromType] = Field(None, alias="elements_from")
[docs]class PathDataElement(BaseDataElement): src: Literal["path"] path: str items_from: Optional[ElementsFromType] = Field(None, alias="elements_from") link_data_only: Optional[bool] = None
[docs]class CompositeDataElement(BaseDataElement): src: Literal["composite"] composite: "CompositeItems"
[docs]class CompositeItems(FetchBaseModel): items: List[ Union[FileDataElement, PastedDataElement, UrlDataElement, PathDataElement, ServerDirElement, FtpImportElement] ] = Field(..., alias="elements")
CompositeDataElement.model_rebuild()
[docs]class NestedElement(BaseDataElement): items: List[Union["AnyElement", "NestedElement"]] = Field(..., alias="elements")
AnyElement = Annotated[ Union[ FileDataElement, PastedDataElement, UrlDataElement, PathDataElement, ServerDirElement, FtpImportElement, CompositeDataElement, ], Field(default_factory=None, discriminator="src"), ] # Seems to be a bug in pydantic ... can't reuse AnyElement in more than one model AnyElement2 = Annotated[ Union[ FileDataElement, PastedDataElement, UrlDataElement, PathDataElement, ServerDirElement, FtpImportElement, CompositeDataElement, ], Field(default_factory=None, discriminator="src"), ] NestedElement.model_rebuild()
[docs]class BaseDataTarget(BaseFetchDataTarget): destination: Union[HdaDestination, LibraryFolderDestination, LibraryDestination] = Field(..., discriminator="type")
[docs]class DataElementsTarget(BaseDataTarget): items: List[Union[AnyElement, NestedElement]] = Field(..., alias="elements")
[docs]class DataElementsFromTarget(BaseDataTarget, ItemsFromModel): items_from: ElementsFromType = Field(..., alias="elements_from")
[docs]class HdcaDataItemsTarget(BaseCollectionTarget): items: List[Union[AnyElement2, NestedElement]] = Field(..., alias="elements")
[docs]class HdcaDataItemsFromTarget(BaseCollectionTarget, ItemsFromModel): items_from: ElementsFromType = Field(..., alias="elements_from")
[docs]class FilesPayload(Model): filename: str local_filename: str
[docs]class BaseDataPayload(FetchBaseModel): history_id: DecodedDatabaseIdField model_config = ConfigDict(extra="allow")
[docs] @field_validator("targets", mode="before", check_fields=False) @classmethod def targets_string_to_json(cls, v): if isinstance(v, str): return json.loads(v) return v
Targets = List[ Union[ DataElementsTarget, HdcaDataItemsTarget, DataElementsFromTarget, HdcaDataItemsFromTarget, FtpImportTarget, ] ]
[docs]class FetchDataPayload(BaseDataPayload): targets: Targets
[docs]class FetchDataFormPayload(BaseDataPayload): targets: Union[Json[Targets], Targets]