Files

130 lines
5.3 KiB
Python

from datetime import datetime
from typing import Annotated
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
import backend.config
import constants
from backend.session import get_session
from constants import DEALS_VIEWER, DEAL_EDITOR
from models import InviteCode
from schemas.auth import *
from services.base import BaseService
oauth2_schema = HTTPBearer()
algorithm = 'HS256'
async def get_current_user(
session: Annotated[AsyncSession, Depends(get_session)],
token: Annotated[HTTPAuthorizationCredentials, Depends(oauth2_schema)]
) -> UserUnion:
if not token.credentials:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
try:
payload = jwt.decode(token.credentials, backend.config.SECRET_KEY, algorithms=[algorithm])
user_id = payload.get('sub')
if not user_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials')
if user_id == DEAL_EDITOR or user_id == DEALS_VIEWER:
return payload
user_id = int(user_id)
user: Optional[User] = await session.get(User, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid credentials')
if user.is_deleted or user.is_blocked:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Пользователь заблокирован или удален')
return user
except JWTError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
async def authorized_user(
user: Annotated[User, Depends(get_current_user)]
):
if type(user) is User:
return user
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
async def guest_user(user: Annotated[UserUnion, Depends(get_current_user)]):
if (type(user) is User) or (type(user) is dict):
return user
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
async def user_deals_viewer(user: Annotated[UserUnion, Depends(get_current_user)]):
if (type(user) is User) or (type(user) is dict and user['sub'] == DEALS_VIEWER):
return user
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
async def user_deal_editor(user: Annotated[UserUnion, Depends(get_current_user)]):
if (type(user) is User) or (type(user) is dict and user['sub'] == DEAL_EDITOR):
return user
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
class AuthService(BaseService):
@staticmethod
def _generate_jwt_token(payload: dict) -> str:
return jwt.encode(payload, backend.config.SECRET_KEY, algorithm=algorithm)
async def authenticate(self, request: AuthLoginRequest):
user: Optional[User] = await self.session.scalar(select(User).where(User.telegram_id == request.id))
if user and (user.is_deleted or user.is_blocked):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Пользователь заблокирован или удален')
if not user and request.invite_code:
invite_code = await self.session.scalar(
select(
InviteCode
)
.where(
InviteCode.code == request.invite_code,
InviteCode.is_activated == False
)
)
if not invite_code:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Неверный код приглашения')
# check if code is expired
delta = datetime.now() - invite_code.created_at
if delta.seconds >= constants.INVITE_CODE_EXPIRY:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Код приглашения устарел')
user = await self.session.get(User, invite_code.user_id)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Пользователь не найден')
user.telegram_id = request.id
await self.session.flush()
invite_code.is_activated = True
await self.session.commit()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Пользователь не найден')
payload = {
'sub': str(user.id),
'role': user.role_key,
}
access_token = self._generate_jwt_token(payload)
return AuthLoginResponse(access_token=access_token)
def create_deal_guest_token(self, deal_id: int):
payload: UserDealEditor = {
'sub': constants.DEAL_EDITOR,
'deal_id': deal_id
}
return self._generate_jwt_token(payload)
def create_client_guest_token(self, client_id: int):
payload: UserViewer = {
'sub': constants.DEALS_VIEWER,
'client_id': client_id
}
return self._generate_jwt_token(payload)