Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.api.uploads

"""
API operations for uploaded files in storage.
"""
import logging
import os
import re

from galaxy.exceptions import MessageException, NotImplemented
from galaxy.web import expose_api_anonymous
from galaxy.web.base.controller import BaseAPIController

log = logging.getLogger(__name__)


[docs]class UploadsAPIController(BaseAPIController): READ_CHUNK_SIZE = 2 ** 16
[docs] @expose_api_anonymous def index(self, trans, **kwd): raise NotImplemented("Listing uploads is not implemented.")
[docs] @expose_api_anonymous def create(self, trans, payload, **kwd): """ POST /api/uploads/ """ session_id = payload.get("session_id") session_start = payload.get("session_start") session_chunk = payload.get("session_chunk") if re.match('^[\w-]+$', session_id) is None: raise MessageException("Requires a session id.") if session_start is None: raise MessageException("Requires a session start.") if not hasattr(session_chunk, "file"): raise MessageException("Requires a session chunk.") target_file = os.path.join(trans.app.config.new_file_path, session_id) target_size = 0 if os.path.exists(target_file): target_size = os.path.getsize(target_file) if session_start != target_size: raise MessageException("Incorrect session start.") chunk_size = os.fstat(session_chunk.file.fileno()).st_size if chunk_size > trans.app.config.chunk_upload_size: raise MessageException("Invalid chunk size.") with open(target_file, "ab") as f: while True: read_chunk = session_chunk.file.read(self.READ_CHUNK_SIZE) if not read_chunk: break f.write(read_chunk) session_chunk.file.close() return {"message": "Successful."}