from typing import (
    Any,
    Dict,
    Optional,
)
from galaxy.exceptions import error_codes
from galaxy_test.base.api_asserts import (
    assert_error_code_is,
    assert_has_keys,
    assert_status_code_is,
)
from galaxy_test.base.decorators import requires_admin
from galaxy_test.base.populators import DatasetPopulator
from ._framework import ApiTestCase
[docs]
class TestRolesApi(ApiTestCase):
    dataset_populator: DatasetPopulator
[docs]
    def setUp(self):
        super().setUp()
        self.dataset_populator = DatasetPopulator(self.galaxy_interactor) 
[docs]
    @requires_admin
    def test_list_and_show(self):
        def check_roles_response(response):
            assert_status_code_is(response, 200)
            as_list = response.json()
            assert isinstance(as_list, list)
            assert len(as_list) > 0
            for role in as_list:
                self.check_role_dict(role)
        user_role_id = self.dataset_populator.user_private_role_id()
        with self._different_user():
            different_user_role_id = self.dataset_populator.user_private_role_id()
        admin_roles_response = self._get("roles", admin=True)
        user_roles_response = self._get("roles")
        check_roles_response(admin_roles_response)
        check_roles_response(user_roles_response)
        admin_roles_response_ids = [r["id"] for r in admin_roles_response.json()]
        user_roles_response_ids = [r["id"] for r in user_roles_response.json()]
        # User can see their own private role not the other users, admin can see both.
        assert user_role_id in user_roles_response_ids
        assert different_user_role_id not in user_roles_response_ids
        assert user_role_id in admin_roles_response_ids
        assert different_user_role_id in admin_roles_response_ids
        # Check showing a valid, role.
        role_response = self._get(f"roles/{user_role_id}")
        assert_status_code_is(role_response, 200)
        role = role_response.json()
        self.check_role_dict(role, assert_id=user_role_id) 
[docs]
    @requires_admin
    def test_create_invalid_params(self):
        # In theory these low-level validation test cases could be handled in more
        # of a unit test style but it makes sense during the transition from wsgi to
        # asgi to have some tests that validate the whole pipeline is being integrated
        # properly in terms of exception handling.
        # Test missing description
        name = self.dataset_populator.get_random_name()
        description = "A test role."
        payload = {
            "name": name,
            "user_ids": [self.dataset_populator.user_id()],
        }
        response = self._post("roles", payload, admin=True, json=True)
        assert_status_code_is(response, 400)
        assert_error_code_is(response, error_codes.error_codes_by_name["USER_REQUEST_MISSING_PARAMETER"].code)
        assert "description" in response.json()["err_msg"]
        # Test missing name
        payload_missing_name = {
            "description": description,
            "user_ids": [self.dataset_populator.user_id()],
        }
        response = self._post("roles", payload_missing_name, admin=True, json=True)
        assert_status_code_is(response, 400)
        assert_error_code_is(response, error_codes.error_codes_by_name["USER_REQUEST_MISSING_PARAMETER"].code)
        assert "name" in response.json()["err_msg"]
        # Test invalid type for name
        payload_invalid_type = {
            "name": ["a test", "name"],
            "description": description,
            "user_ids": [self.dataset_populator.user_id()],
        }
        response = self._post("roles", payload_invalid_type, admin=True, json=True)
        assert_status_code_is(response, 400)
        assert_error_code_is(response, error_codes.error_codes_by_name["USER_REQUEST_INVALID_PARAMETER"].code)
        assert "name" in response.json()["err_msg"]
        assert "validation_errors" in response.json() 
[docs]
    @requires_admin
    def test_create_valid(self):
        name = self.dataset_populator.get_random_name()
        description = "A test role."
        role = self._create_role(name=name, description=description)
        assert role["name"] == name
        assert role["description"] == description
        user_roles_response = self._get("roles")
        with self._different_user():
            different_user_roles_response = self._get("roles")
        user_roles_response_ids = [r["id"] for r in user_roles_response.json()]
        different_user_roles_response_ids = [r["id"] for r in different_user_roles_response.json()]
        # This new role is public, all users see it.
        assert role["id"] in user_roles_response_ids
        assert role["id"] in different_user_roles_response_ids 
[docs]
    @requires_admin
    def test_show_error_codes(self):
        # Bad role ids are 400.
        response = self._get("roles/badroleid")
        assert_status_code_is(response, 400)
        # Trying to access others roles raise (not found) error
        with self._different_user():
            different_user_role_id = self.dataset_populator.user_private_role_id()
        response = self._get(f"roles/{different_user_role_id}")
        assert_status_code_is(response, 404) 
[docs]
    @requires_admin
    def test_create_only_admin(self):
        response = self._post("roles", json=True)
        assert_status_code_is(response, 403)
        response_err = response.json()
        assert response_err["err_code"] == 403006
        assert "administrator" in response_err["err_msg"] 
[docs]
    @requires_admin
    def test_delete(self):
        role = self._create_role()
        role_id = role["id"]
        response = self._delete(f"roles/{role_id}", admin=True)
        assert_status_code_is(response, 200) 
[docs]
    @requires_admin
    def test_delete_duplicating_name_raises_409(self):
        role = self._create_role()
        role_id = role["id"]
        role_name = role["name"]
        delete_response = self._delete(f"roles/{role_id}", admin=True)
        self._assert_status_code_is_ok(delete_response)
        # Create a new role with the same name as the deleted one is not allowed
        payload = self._build_valid_role_payload(role_name)
        response = self._post("roles", payload, admin=True, json=True)
        self._assert_status_code_is(response, 409) 
[docs]
    @requires_admin
    def test_purge(self):
        role = self._create_role()
        role_id = role["id"]
        # Delete and purge the role
        delete_response = self._delete(f"roles/{role_id}", admin=True)
        self._assert_status_code_is_ok(delete_response)
        purge_response = self._post(f"roles/{role_id}/purge", admin=True)
        self._assert_status_code_is_ok(purge_response)
        # The role is deleted and purged, so it cannot be found
        response = self._get(f"roles/{role_id}", admin=True)
        self._assert_status_code_is(response, 404) 
[docs]
    @requires_admin
    def test_purge_can_reuse_name(self):
        role = self._create_role()
        role_id = role["id"]
        role_name = role["name"]
        # Delete and purge the role
        delete_response = self._delete(f"roles/{role_id}", admin=True)
        self._assert_status_code_is_ok(delete_response)
        purge_response = self._post(f"roles/{role_id}/purge", admin=True)
        self._assert_status_code_is_ok(purge_response)
        # Create a new role with the same name as the deleted one is allowed
        payload = self._build_valid_role_payload(role_name)
        response = self._post("roles", payload, admin=True, json=True)
        self._assert_status_code_is(response, 200) 
    def _create_role(self, name: Optional[str] = None, description: Optional[str] = None) -> Dict[str, Any]:
        payload = self._build_valid_role_payload(name=name, description=description)
        response = self._post("roles", payload, admin=True, json=True)
        assert_status_code_is(response, 200)
        role = response.json()
        self.check_role_dict(role)
        return role
    def _build_valid_role_payload(self, name: Optional[str] = None, description: Optional[str] = None):
        name = name or self.dataset_populator.get_random_name()
        description = description or f"A test role with name: {name}."
        payload = {
            "name": name,
            "description": description,
            "user_ids": [self.dataset_populator.user_id()],
        }
        return payload
[docs]
    @staticmethod
    def check_role_dict(role_dict: Dict[str, Any], assert_id: Optional[str] = None) -> None:
        assert_has_keys(role_dict, "id", "name", "model_class", "url")
        assert role_dict["model_class"] == "Role"
        if assert_id is not None:
            assert role_dict["id"] == assert_id