Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.api.uploads
"""
API operations for uploaded files in storage.
"""
import logging
import os
import re
from galaxy import exceptions
from galaxy.web import legacy_expose_api_anonymous
from . import BaseGalaxyAPIController
log = logging.getLogger(__name__)
[docs]class UploadsAPIController(BaseGalaxyAPIController):
READ_CHUNK_SIZE = 2 ** 16
[docs] @legacy_expose_api_anonymous
def index(self, trans, **kwd):
raise exceptions.NotImplemented("Listing uploads is not implemented.")
[docs] @legacy_expose_api_anonymous
def create(self, trans, payload, **kwd):
"""
POST /api/uploads/
"""
session_id = payload.get("session_id")
session_start = payload.get("session_start")
session_chunk = payload.get("session_chunk")
if re.match(r'^[\w-]+$', session_id) is None:
raise exceptions.MessageException("Requires a session id.")
if session_start is None:
raise exceptions.MessageException("Requires a session start.")
if not hasattr(session_chunk, "file"):
raise exceptions.MessageException("Requires a session chunk.")
target_file = os.path.join(trans.app.config.new_file_path, session_id)
target_size = 0
if os.path.exists(target_file):
target_size = os.path.getsize(target_file)
if session_start != target_size:
raise exceptions.MessageException("Incorrect session start.")
chunk_size = os.fstat(session_chunk.file.fileno()).st_size
if chunk_size > trans.app.config.chunk_upload_size:
raise exceptions.MessageException("Invalid chunk size.")
with open(target_file, "ab") as f:
while True:
read_chunk = session_chunk.file.read(self.READ_CHUNK_SIZE)
if not read_chunk:
break
f.write(read_chunk)
session_chunk.file.close()
return {"message": "Successful."}