Source code for csle_rest_api.resources.login.routes

"""
Routes and sub-resources for the /login resource
"""
from typing import Tuple
import json
import secrets
import time
import bcrypt
import csle_common.constants.constants as constants
from csle_common.dao.management.session_token import SessionToken
from csle_common.metastore.metastore_facade import MetastoreFacade
from flask import Blueprint, jsonify, request, Response
import csle_rest_api.constants.constants as api_constants

# Creates a blueprint "sub application" of the main REST app
login_bp = Blueprint(api_constants.MGMT_WEBAPP.LOGIN_RESOURCE, __name__,
                     url_prefix=f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.LOGIN_RESOURCE}")


[docs]@login_bp.route("", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def read_login() -> Tuple[Response, int]: """ The /login resource :return: Authenticates the login information and returns the result """ token = secrets.token_urlsafe(32) json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.USERNAME_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.USERNAME_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) username = json_data[api_constants.MGMT_WEBAPP.USERNAME_PROPERTY] user_account = MetastoreFacade.get_management_user_by_username(username=username) response_code = constants.HTTPS.UNAUTHORIZED_STATUS_CODE response = jsonify({}) if user_account is not None: if api_constants.MGMT_WEBAPP.PASSWORD_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.PASSWORD_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) password = json_data[api_constants.MGMT_WEBAPP.PASSWORD_PROPERTY] byte_pwd = password.encode("utf-8") pw_hash = bcrypt.hashpw(byte_pwd, user_account.salt.encode("utf-8")).decode("utf-8") if user_account.password == pw_hash: response_code = constants.HTTPS.OK_STATUS_CODE new_token = MetastoreFacade.get_session_token_by_username(username=username) ts = time.time() if new_token is None: new_token = SessionToken(token=token, username=username, timestamp=ts) MetastoreFacade.save_session_token(session_token=new_token) else: new_token.timestamp = ts MetastoreFacade.update_session_token( session_token=new_token, token=new_token.token ) data_dict = { api_constants.MGMT_WEBAPP.TOKEN_PROPERTY: new_token.token, api_constants.MGMT_WEBAPP.ADMIN_PROPERTY: user_account.admin, api_constants.MGMT_WEBAPP.USERNAME_PROPERTY: user_account.username, api_constants.MGMT_WEBAPP.FIRST_NAME_PROPERTY: user_account.first_name, api_constants.MGMT_WEBAPP.LAST_NAME_PROPERTY: user_account.last_name, api_constants.MGMT_WEBAPP.ORGANIZATION_PROPERTY: user_account.organization, api_constants.MGMT_WEBAPP.EMAIL_PROPERTY: user_account.email, api_constants.MGMT_WEBAPP.ID_PROPERTY: user_account.id, } response = jsonify(data_dict) response.headers.add( api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*" ) return response, response_code