Source code for csle_rest_api.resources.users.routes

"""
Routes and sub-resources for the /users resource
"""
from typing import Tuple
import json
import bcrypt
import csle_common.constants.constants as constants
from csle_common.dao.management.management_user import ManagementUser
from csle_common.metastore.metastore_facade import MetastoreFacade
from flask import Blueprint, jsonify, request, Response
import csle_rest_api.constants.constants as api_constants
import csle_rest_api.util.rest_api_util as rest_api_util

# Creates a blueprint "sub application" of the main REST app
users_bp = Blueprint(api_constants.MGMT_WEBAPP.USERS_RESOURCE, __name__,
                     url_prefix=f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.USERS_RESOURCE}")


[docs]@users_bp.route("", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_GET, api_constants.MGMT_WEBAPP.HTTP_REST_DELETE]) def users() -> Tuple[Response, int]: """ The /users resource. :return: A list of management users or a list of ids of the users or deletes all users """ requires_admin = True authorized = rest_api_util.check_if_user_is_authorized( request=request, requires_admin=requires_admin ) if authorized is not None: return authorized if request.method == api_constants.MGMT_WEBAPP.HTTP_REST_GET: # Check if ids query parameter is True, then only return the ids and not the whole list of users ids = request.args.get(api_constants.MGMT_WEBAPP.IDS_QUERY_PARAM) if ids is not None and ids: return users_ids() users = MetastoreFacade.list_management_users() for i in range(len(users)): users[i].password = "" users[i].salt = "" users_dicts = list(map(lambda x: x.to_dict(), users)) response = jsonify(users_dicts) response.headers.add( api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*" ) return response, constants.HTTPS.OK_STATUS_CODE elif request.method == api_constants.MGMT_WEBAPP.HTTP_REST_DELETE: users = MetastoreFacade.list_management_users() for user in users: MetastoreFacade.remove_management_user(management_user=user) response = jsonify({}) response.headers.add( api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*" ) return response, constants.HTTPS.OK_STATUS_CODE return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: "HTTP method not supported"}), constants.HTTPS.BAD_REQUEST_STATUS_CODE)
[docs]def users_ids() -> Tuple[Response, int]: """ Gets the user ids :return: An HTTP response with all user ids """ user_ids = MetastoreFacade.list_management_users_ids() response_dicts = [] for tup in user_ids: response_dicts.append({api_constants.MGMT_WEBAPP.ID_PROPERTY: tup[0]}) response = jsonify(response_dicts) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@users_bp.route(f"{constants.COMMANDS.SLASH_DELIM}<user_id>", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_GET, api_constants.MGMT_WEBAPP.HTTP_REST_DELETE, api_constants.MGMT_WEBAPP.HTTP_REST_PUT]) def user(user_id: int) -> Tuple[Response, int]: """ The /users/id resource. :param user_id: the id of the user :return: The given user or deletes the user """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=False) if authorized is not None: return authorized # Check if user is admin or is editing his/hers own account user = MetastoreFacade.get_management_user_config(id=int(user_id)) if user is None: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.NOT_FOUND_STATUS_CODE request_user = rest_api_util.check_if_user_edit_is_authorized(request=request, user=user) if not isinstance(request_user, ManagementUser): if request_user is not None: return request_user else: return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: f"User: {user} not found"}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) response = jsonify({}) if user is not None: if request.method == api_constants.MGMT_WEBAPP.HTTP_REST_GET: user.password = "" user.salt = "" response = jsonify(user.to_dict()) elif request.method == api_constants.MGMT_WEBAPP.HTTP_REST_PUT: new_user = json.loads(request.data) if not ( api_constants.MGMT_WEBAPP.USERNAME_PROPERTY in new_user and api_constants.MGMT_WEBAPP.PASSWORD_PROPERTY in new_user and api_constants.MGMT_WEBAPP.FIRST_NAME_PROPERTY in new_user and api_constants.MGMT_WEBAPP.LAST_NAME_PROPERTY in new_user and api_constants.MGMT_WEBAPP.ORGANIZATION_PROPERTY in new_user and api_constants.MGMT_WEBAPP.ID_PROPERTY in new_user and api_constants.MGMT_WEBAPP.EMAIL_PROPERTY in new_user and api_constants.MGMT_WEBAPP.ADMIN_PROPERTY in new_user ): if api_constants.MGMT_WEBAPP.PASSWORD_PROPERTY in new_user: new_user[api_constants.MGMT_WEBAPP.PASSWORD_PROPERTY] = "" if api_constants.MGMT_WEBAPP.SALT_PROPOERTY in new_user: new_user[api_constants.MGMT_WEBAPP.SALT_PROPOERTY] = "" response = jsonify(new_user) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE new_user = ManagementUser.from_dict(new_user) if new_user.password == "": new_user.password = user.password new_user.salt = user.salt else: byte_pwd = new_user.password.encode("utf-8") salt = bcrypt.gensalt() pw_hash = bcrypt.hashpw(byte_pwd, salt) new_user.salt = salt.decode("utf-8") new_user.password = pw_hash.decode("utf-8") # A user cannot be given admin rights by a non-admin user if not request_user.admin: new_user.admin = False # An admin user cannot remove its own admin rights if request_user.admin and user.admin and request_user.username == user.username: new_user.admin = True MetastoreFacade.update_management_user(management_user=new_user, id=user_id) new_user.salt = "" new_user.password = "" user.salt = "" user.password = "" response = jsonify(user.to_dict()) else: MetastoreFacade.remove_management_user(management_user=user) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@users_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.CREATE_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def create_user() -> Tuple[Response, int]: """ The /users/create resource. :return: creates a new user """ response = jsonify({}) if constants.CONFIG_FILE.PARSED_CONFIG is not None and constants.CONFIG_FILE.PARSED_CONFIG.allow_registration: json_data = json.loads(request.data) if (api_constants.MGMT_WEBAPP.USERNAME_PROPERTY in json_data and api_constants.MGMT_WEBAPP.PASSWORD_PROPERTY in json_data and api_constants.MGMT_WEBAPP.FIRST_NAME_PROPERTY in json_data and api_constants.MGMT_WEBAPP.LAST_NAME_PROPERTY in json_data and api_constants.MGMT_WEBAPP.EMAIL_PROPERTY in json_data and api_constants.MGMT_WEBAPP.ORGANIZATION_PROPERTY in json_data and api_constants.MGMT_WEBAPP.ADMIN_PROPERTY in json_data): username = json_data[api_constants.MGMT_WEBAPP.USERNAME_PROPERTY] password = json_data[api_constants.MGMT_WEBAPP.PASSWORD_PROPERTY] first_name = json_data[api_constants.MGMT_WEBAPP.FIRST_NAME_PROPERTY] last_name = json_data[api_constants.MGMT_WEBAPP.LAST_NAME_PROPERTY] organization = json_data[ api_constants.MGMT_WEBAPP.ORGANIZATION_PROPERTY ] email = json_data[api_constants.MGMT_WEBAPP.EMAIL_PROPERTY] if password == "" or username == "": error_reason = "Password or username cannot be empty." response = jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: error_reason}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE else: usernames = list(map(lambda x: x.username, MetastoreFacade.list_management_users())) if username in usernames: error_reason = "A user with that username already exists" response = jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: error_reason}) return response, constants.HTTPS.CONFLICT_STATUS_CODE byte_pwd = password.encode("utf-8") salt = bcrypt.gensalt() pw_hash = bcrypt.hashpw(byte_pwd, salt) user = ManagementUser(username=username, password=pw_hash.decode("utf-8"), admin=False, salt=salt.decode("utf-8"), first_name=first_name, last_name=last_name, organization=organization, email=email) MetastoreFacade.save_management_user(management_user=user) user.salt = "" user.password = "" response = jsonify(user.to_dict()) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE else: return response, constants.HTTPS.UNAUTHORIZED_STATUS_CODE