Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy_test.api.test_users
from galaxy_test.api._framework import ApiTestCase
from galaxy_test.base.api_asserts import assert_object_id_error
from galaxy_test.base.decorators import (
requires_admin,
requires_new_history,
requires_new_user,
)
from galaxy_test.base.populators import (
DatasetPopulator,
skip_without_tool,
)
TEST_USER_EMAIL = "user_for_users_index_test@bx.psu.edu"
TEST_USER_EMAIL_INDEX_DELETED = "user_for_users_index_deleted_test@bx.psu.edu"
TEST_USER_EMAIL_DELETE = "user_for_delete_test@bx.psu.edu"
TEST_USER_EMAIL_DELETE_CANCEL_JOBS = "user_for_delete_cancel_jobs_test@bx.psu.edu"
TEST_USER_EMAIL_PURGE = "user_for_purge_test@bx.psu.edu"
TEST_USER_EMAIL_UNDELETE = "user_for_undelete_test@bx.psu.edu"
TEST_USER_EMAIL_SHOW = "user_for_show_test@bx.psu.edu"
[docs]class TestUsersApi(ApiTestCase):
[docs] @requires_admin
@requires_new_user
def test_index(self):
user = self._setup_user(TEST_USER_EMAIL_INDEX_DELETED)
all_users_response = self._get("users", admin=True)
self._assert_status_code_is(all_users_response, 200)
all_users = all_users_response.json()
# New user is in list
assert len([u for u in all_users if u["email"] == TEST_USER_EMAIL_INDEX_DELETED]) == 1
# Request made from admin user, so should at least self and this
# new user.
assert len(all_users) > 1
# index of deleted users
self._delete(f"users/{user['id']}", admin=True)
all_deleted_users_response_1 = self._get("users/deleted", admin=True)
self._assert_status_code_is(all_deleted_users_response_1, 200)
payload = {"deleted": "True"}
all_deleted_users_response_2 = self._get("users", data=payload, admin=True)
self._assert_status_code_is(all_deleted_users_response_2, 200)
# user is in list of deleted users
all_deleted_users = all_deleted_users_response_1.json()
assert len([u for u in all_deleted_users if u["email"] == TEST_USER_EMAIL_INDEX_DELETED]) == 1
all_deleted_users = all_deleted_users_response_2.json()
assert len([u for u in all_deleted_users if u["email"] == TEST_USER_EMAIL_INDEX_DELETED]) == 1
[docs] @requires_new_user
def test_index_only_self_for_nonadmins(self):
self._setup_user(TEST_USER_EMAIL)
with self._different_user():
all_users_response = self._get("users")
# Non admin users can only see themselves
assert len(all_users_response.json()) == 1
[docs] @requires_new_user
def test_show(self):
user = self._setup_user(TEST_USER_EMAIL)
with self._different_user(email=TEST_USER_EMAIL):
show_response = self.__show(user)
self._assert_status_code_is(show_response, 200)
self.__assert_matches_user(user, show_response.json())
[docs] @requires_new_user
def test_update(self):
payload = {"username": "linnaeus"}
user = self._setup_user(TEST_USER_EMAIL)
not_the_user = self._setup_user("email@example.com")
with self._different_user(email=TEST_USER_EMAIL):
# working
update_response = self.__update(user, data=payload)
self._assert_status_code_is(update_response, 200)
update_json = update_response.json()
assert update_json["username"] == payload["username"]
# too short
update_response = self.__update(user, data={"username": ""})
self._assert_status_code_is(update_response, 400)
# not them
update_response = self.__update(not_the_user, data=payload)
self._assert_status_code_is(update_response, 400)
# non-existent
no_user_id = "5d7db0757a2eb7ef"
update_url = self._api_url(f"users/{no_user_id}")
update_response = self._put(update_url, data=payload, json=True)
assert_object_id_error(update_response)
[docs] @requires_admin
@requires_new_user
def test_admin_update(self):
payload = {"username": "flexo"}
user = self._setup_user(TEST_USER_EMAIL)
update_url = self._api_url(f"users/{user['id']}")
update_response = self._put(update_url, data=payload, admin=True, json=True)
self._assert_status_code_is(update_response, 200)
update_json = update_response.json()
assert update_json["username"] == payload["username"]
[docs] @requires_admin
@requires_new_user
def test_delete_user(self):
user = self._setup_user(TEST_USER_EMAIL_DELETE)
self._delete(f"users/{user['id']}", admin=True)
updated_user = self._get(f"users/deleted/{user['id']}", admin=True).json()
assert updated_user["deleted"] is True, updated_user
[docs] @requires_admin
@requires_new_user
def test_purge_user(self):
"""Delete user and then purge them."""
user = self._setup_user(TEST_USER_EMAIL_PURGE)
response = self._delete(f"users/{user['id']}", admin=True)
self._assert_status_code_is_ok(response)
params = dict(purge="True")
response = self._delete(f"users/{user['id']}", params=params, admin=True, json=True)
self._assert_status_code_is_ok(response)
params = {"deleted": "True"}
purged_user = self._get(f"users/{user['id']}", params, admin=True).json()
assert purged_user["deleted"] is True, purged_user
assert purged_user["purged"] is True, purged_user
[docs] @requires_admin
@requires_new_user
def test_undelete_user(self):
"""Delete user and then undelete them."""
user = self._setup_user(TEST_USER_EMAIL_UNDELETE)
self._delete(f"users/{user['id']}", admin=True)
payload = {"deleted": "True"}
deleted_user = self._get(f"users/{user['id']}", payload, admin=True).json()
assert deleted_user["deleted"] is True, deleted_user
self._post(f"users/deleted/{user['id']}/undelete", admin=True)
undeleted_user = self._get(f"users/{user['id']}", admin=True).json()
assert undeleted_user["deleted"] is False, undeleted_user
[docs] @requires_admin
@requires_new_user
@requires_new_history
@skip_without_tool("cat_data_and_sleep")
def test_delete_user_cancel_all_jobs(self):
dataset_populator = DatasetPopulator(self.galaxy_interactor)
with self._different_user(TEST_USER_EMAIL_DELETE_CANCEL_JOBS):
user_id = self._get_current_user_id()
history_id = dataset_populator.new_history()
hda_id = dataset_populator.new_dataset(history_id)["id"]
inputs = {
"input1": {"src": "hda", "id": hda_id},
"sleep_time": 6000,
}
run_response = dataset_populator.run_tool_raw(
"cat_data_and_sleep",
inputs,
history_id,
)
self._assert_status_code_is_ok(run_response)
job_id = run_response.json()["jobs"][0]["id"]
# Wait a bit for the job to be ready
expected_job_states = ["new", "queued", "running"]
dataset_populator.wait_for_job(job_id, ok_states=expected_job_states)
# Get the job state
job_response = self._get(f"jobs/{job_id}").json()
assert job_response["state"] in expected_job_states, job_response
# Delete user will cancel all jobs
self._delete(f"users/{user_id}", admin=True)
# Get the job state again (this time as admin), it should be deleting
job_response = self._get(f"jobs/{job_id}", admin=True).json()
assert job_response["state"] == "deleting", job_response
[docs] @requires_new_user
def test_information(self):
user = self._setup_user(TEST_USER_EMAIL)
url = self.__url("information/inputs", user)
response = self._get(url).json()
assert response["username"] == user["username"]
assert response["email"] == TEST_USER_EMAIL
payload = {"username": "newname", "email": "new@email.email"}
self._put(url, data=payload, json=True)
response = self._get(url).json()
assert response["username"] == "newname"
assert response["email"] == "new@email.email"
payload = {"username": user["username"], "email": TEST_USER_EMAIL}
self._put(url, data=payload, json=True)
response = self._get(url).json()
assert response["username"] == user["username"]
assert response["email"] == TEST_USER_EMAIL
self._put(url, data={"address_0|desc": "_desc"}, json=True)
response = self._get(url).json()
assert len(response["addresses"]) == 1
assert response["addresses"][0]["desc"] == "_desc"
[docs] @requires_new_user
def test_manage_api_key(self):
with self._different_user("manage-api-key-test@user.com"):
user_id = self._get_current_user_id()
# Initially we have an API key because it is bootstrapped for tests
response = self._get(f"users/{user_id}/api_key")
user_api_key = response.json()
assert user_api_key
# Test detailed endpoint
response = self._get(f"users/{user_id}/api_key/detailed")
api_key = response.json()
assert api_key["key"] == user_api_key
# Delete user API key
response = self._delete(f"users/{user_id}/api_key")
self._assert_status_code_is(response, 204)
# No API key anymore, so the detailed request returns no content 204 with admin key
response = self._get(f"users/{user_id}/api_key/detailed", admin=True)
self._assert_status_code_is(response, 204)
# No API key anymore, so the detailed request returns unauthorized 401 with user key
response = self._get(f"users/{user_id}/api_key/detailed")
self._assert_status_code_is(response, 401)
# create new as admin
response = self._post(f"users/{user_id}/api_key", admin=True)
self._assert_status_code_is_ok(response)
new_api_key = response.json()
assert new_api_key
assert new_api_key != user_api_key
[docs] @requires_new_user
def test_only_admin_can_manage_other_users_api_key(self):
with self._different_user():
other_user_id = self._get_current_user_id()
current_user_id = self._get_current_user_id()
# Users cannot access other users API keys
assert current_user_id != other_user_id
response = self._get(f"users/{other_user_id}/api_key")
self._assert_status_code_is(response, 403)
response = self._post(f"users/{other_user_id}/api_key")
self._assert_status_code_is(response, 403)
response = self._delete(f"users/{other_user_id}/api_key")
self._assert_status_code_is(response, 403)
# Admins can access other users API keys
response = self._get(f"users/{other_user_id}/api_key", admin=True)
self._assert_status_code_is_ok(response)
response = self._post(f"users/{other_user_id}/api_key", admin=True)
self._assert_status_code_is_ok(response)
response = self._delete(f"users/{other_user_id}/api_key", admin=True)
self._assert_status_code_is_ok(response)
[docs] @requires_admin
@requires_new_user
@skip_without_tool("cat1")
def test_favorites(self):
user = self._setup_user(TEST_USER_EMAIL)
# adding a tool to favorites
url = self._api_url(f"users/{user['id']}/favorites/tools")
put_response = self._put(url, data={"object_id": "cat1"}, admin=True, json=True)
self._assert_status_code_is_ok(put_response)
assert put_response.json()["tools"][0] == "cat1"
# not implemented for workflows yet
url = self._api_url(f"users/{user['id']}/favorites/workflows")
put_response = self._put(url, data={"object_id": "14ds68f4sda68gf46dsag4"}, admin=True, json=True)
self._assert_status_code_is(put_response, 400)
# delete existing tool favorite
url = self._api_url(f"users/{user['id']}/favorites/tools/cat1")
delete_response = self._delete(url, admin=True)
self._assert_status_code_is_ok(delete_response)
assert delete_response.json()["tools"] == []
# delete non-existing tool favorite
url = self._api_url(f"users/{user['id']}/favorites/tools/madeuptoolthatdoes/not/exist/in/favs")
delete_response = self._delete(url, admin=True)
self._assert_status_code_is(delete_response, 404)
# delete non existing workflow favorite
url = self._api_url(f"users/{user['id']}/favorites/workflows/1as5das5das56d465")
delete_response = self._delete(url, admin=True)
self._assert_status_code_is(delete_response, 400)
[docs] @skip_without_tool("cat1")
def test_search_favorites(self):
user, user_key = self._setup_user_get_key(TEST_USER_EMAIL)
url = self._api_url(f"users/{user['id']}/favorites/tools", params=dict(key=user_key))
fav_response = self._put(url, data={"object_id": "cat1"}, json=True)
self._assert_status_code_is_ok(fav_response)
assert "cat1" in fav_response.json()["tools"]
url = self._api_url("tools", params=dict(q="#favs", key=user_key))
search_response = self._get(url).json()
assert "cat1" in search_response
[docs] @requires_new_user
def test_set_theme(self):
user = self._setup_user(TEST_USER_EMAIL)
with self._different_user(email=TEST_USER_EMAIL):
url = self._api_url(f"users/{user['id']}/theme/test_theme")
theme_response = self._put(url)
self._assert_status_code_is_ok(theme_response)
url = self._api_url("users/current")
updated_theme = self._get(url).json()["preferences"]["theme"]
assert updated_theme == "test_theme"
[docs] @requires_admin
@requires_new_user
def test_show_delete(self):
user = self._setup_user(TEST_USER_EMAIL_SHOW)
url = self._api_url(f"users/{user['id']}")
response_1 = self._get(url, admin=True).json()
self._delete(f"users/{user['id']}", admin=True)
# Both request should return the same user
response_2 = self._get(f"users/deleted/{user['id']}", admin=True).json()
payload = {"deleted": "True"}
response_3 = self._get(f"users/{user['id']}", payload, admin=True).json()
assert response_1["id"] == response_2["id"] == response_3["id"]
assert response_2 == response_3
[docs] def test_show_current(self):
user_id = self._get_current_user_id()
url = self._api_url(f"users/{user_id}")
specified_user = self._get(url).json()
url = self._api_url("users/current")
current_user = self._get(url).json()
assert specified_user == current_user
def __url(self, action, user):
return self._api_url(f"users/{user['id']}/{action}", params=dict(key=self.master_api_key))
def __show(self, user):
return self._get(f"users/{user['id']}")
def __update(self, user, data):
update_url = self._api_url(f"users/{user['id']}")
return self._put(update_url, data=data, json=True)
def __assert_matches_user(self, userA, userB):
self._assert_has_keys(userB, "id", "username", "total_disk_usage")
assert userA["id"] == userB["id"]
assert userA["username"] == userB["username"]
def _get_current_user_id(self):
users_response = self._get("users")
users = users_response.json()
assert len(users) == 1
return users[0]["id"]
[docs] def test_manage_beacon_settings(self):
user_id = self._get_current_user_id()
# Assert that beacon sharing is initially disabled
response = self._get(f"users/{user_id}/beacon")
user_beacon_settings = response.json()
assert not user_beacon_settings["enabled"]
# Check if post request is successful
response = self._post(f"users/{user_id}/beacon", data={"enabled": True}, json=True)
user_beacon_settings = response.json()
assert user_beacon_settings["enabled"]
# Check if the setting has been persisted
response = self._get(f"users/{user_id}/beacon")
user_beacon_settings = response.json()
assert user_beacon_settings["enabled"]