Source code for galaxy.schema.fetch_data

import json
from enum import Enum
from typing import (
    List,
    Optional,
    Union,
)

from pydantic import (
    Extra,
    Field,
    Json,
    validator,
)
from typing_extensions import (
    Annotated,
    Literal,
)

from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.schema import (
    HistoryIdField,
    Model,
)


[docs]class FetchBaseModel(Model):
[docs] class Config: allow_population_by_field_name = True
[docs]class ElementsFromType(str, Enum): archive = "archive" bagit = "bagit" bagit_archive = "bagit_archive" directory = "directory"
AutoDecompressField = Field(False, description="Decompress compressed data before sniffing?")
[docs]class BaseFetchDataTarget(FetchBaseModel): auto_decompress: bool = AutoDecompressField
[docs]class ItemsFromSrc(str, Enum): url = "url" files = "files" path = "path" ftp_import = "ftp_import" server_dir = "server_dir"
[docs]class Src(str, Enum): url = "url" pasted = "pasted" files = "files" path = "path" composite = "composite" ftp_import = "ftp_import" server_dir = "server_dir"
[docs]class DestinationType(str, Enum): library = "library" library_folder = "library_folder" hdcas = "hdcas" hdas = "hdas"
[docs]class HdaDestination(FetchBaseModel): type: Literal["hdas"]
[docs]class HdcaDestination(FetchBaseModel): type: Literal["hdca"]
[docs]class LibraryFolderDestination(FetchBaseModel): type: Literal["library_folder"] library_folder_id: DecodedDatabaseIdField # For some reason this folder ID must NOT have the 'F' prefix
[docs]class BaseCollectionTarget(BaseFetchDataTarget): destination: HdcaDestination collection_type: Optional[str] tags: Optional[List[str]] name: Optional[str]
[docs]class LibraryDestination(FetchBaseModel): type: Literal["library"] name: str = Field(..., description="Must specify a library name") description: Optional[str] = Field(None, description="Description for library to create") synopsis: Optional[str] = Field(None, description="Description for library to create")
[docs]class ExtraFiles(FetchBaseModel): items_from: Optional[str] src: Src fuzzy_root: Optional[bool] = Field( True, description="Prevent Galaxy from checking for a single file in a directory and re-interpreting the archive", )
[docs]class BaseDataElement(FetchBaseModel): name: Optional[str] dbkey: str = Field("?") info: Optional[str] ext: str = Field("auto") space_to_tab: bool = False to_posix_lines: bool = False deferred: bool = False tags: Optional[List[str]] created_from_basename: Optional[str] extra_files: Optional[ExtraFiles] auto_decompress: bool = AutoDecompressField items_from: Optional[ElementsFromType] = Field(alias="elements_from") collection_type: Optional[str] MD5: Optional[str] description: Optional[str]
[docs] class Config: # reject unknown extra attributes extra = Extra.forbid
[docs]class FileDataElement(BaseDataElement): src: Literal["files"]
[docs]class PastedDataElement(BaseDataElement): src: Literal["pasted"] paste_content: str = Field(..., description="Content to upload")
[docs]class UrlDataElement(BaseDataElement): src: Literal["url"] url: str = Field(..., description="URL to upload")
[docs]class ServerDirElement(BaseDataElement): src: Literal["server_dir"] server_dir: str link_data_only: Optional[bool]
[docs]class FtpImportElement(BaseDataElement): src: Literal["ftp_import"] ftp_path: str items_from: Optional[ElementsFromType] = Field(alias="elements_from") name: Optional[str] collection_type: Optional[str]
[docs]class ItemsFromModel(Model): src: ItemsFromSrc path: Optional[str] ftp_path: Optional[str] server_dir: Optional[str] url: Optional[str]
[docs]class FtpImportTarget(BaseCollectionTarget): src: Literal["ftp_import"] ftp_path: str items_from: Optional[ElementsFromType] = Field(alias="elements_from")
[docs]class PathDataElement(BaseDataElement): src: Literal["path"] path: str items_from: Optional[ElementsFromType] = Field(alias="elements_from") link_data_only: Optional[bool]
[docs]class CompositeDataElement(BaseDataElement): src: Literal["composite"] composite: "CompositeItems"
[docs]class CompositeItems(FetchBaseModel): items: List[ Union[FileDataElement, PastedDataElement, UrlDataElement, PathDataElement, ServerDirElement, FtpImportElement] ] = Field(..., alias="elements")
CompositeDataElement.update_forward_refs()
[docs]class NestedElement(BaseDataElement): items: List[Union["AnyElement", "NestedElement"]] = Field(..., alias="elements")
AnyElement = Annotated[ Union[ FileDataElement, PastedDataElement, UrlDataElement, PathDataElement, ServerDirElement, FtpImportElement, CompositeDataElement, ], Field(default_factory=None, discriminator="src"), ] # Seems to be a bug in pydantic ... can't reuse AnyElement in more than one model AnyElement2 = Annotated[ Union[ FileDataElement, PastedDataElement, UrlDataElement, PathDataElement, ServerDirElement, FtpImportElement, CompositeDataElement, ], Field(default_factory=None, discriminator="src"), ] NestedElement.update_forward_refs()
[docs]class BaseDataTarget(BaseFetchDataTarget): destination: Union[HdaDestination, LibraryFolderDestination, LibraryDestination] = Field(..., discriminator="type")
[docs]class DataElementsTarget(BaseDataTarget): items: List[Union[AnyElement, NestedElement]] = Field(..., alias="elements")
[docs]class DataElementsFromTarget(BaseDataTarget, ItemsFromModel): items_from: ElementsFromType = Field(..., alias="elements_from")
[docs]class HdcaDataItemsTarget(BaseCollectionTarget): items: List[Union[AnyElement2, NestedElement]] = Field(..., alias="elements")
[docs]class HdcaDataItemsFromTarget(BaseCollectionTarget, ItemsFromModel): items_from: ElementsFromType = Field(..., alias="elements_from")
[docs]class FilesPayload(Model): filename: str local_filename: str
[docs]class BaseDataPayload(FetchBaseModel): history_id: DecodedDatabaseIdField = HistoryIdField
[docs] class Config: # file payloads are just tacked on, so we need to allow everything extra = Extra.allow
[docs] @validator("targets", pre=True, check_fields=False) def targets_string_to_json(cls, v): if isinstance(v, str): return json.loads(v) return v
Targets = List[ Union[ DataElementsTarget, HdcaDataItemsTarget, DataElementsFromTarget, HdcaDataItemsFromTarget, FtpImportTarget, ] ]
[docs]class FetchDataPayload(BaseDataPayload): targets: Targets
[docs]class FetchDataFormPayload(BaseDataPayload): targets: Union[Json[Targets], Targets] # type: ignore[type-arg] # https://github.com/samuelcolvin/pydantic/issues/2990