from datetime import datetime from typing import Union, Annotated from fastapi import Depends, HTTPException from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import jwt, JWTError from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from starlette import status import backend.config import constants from backend.session import get_session from constants import DEALS_VIEWER, DEAL_EDITOR from enums.user import UserRole from models import User, InviteCode from schemas.auth import * from services.base import BaseService oauth2_schema = HTTPBearer() algorithm = 'HS256' async def get_current_user( session: Annotated[AsyncSession, Depends(get_session)], token: Annotated[HTTPAuthorizationCredentials, Depends(oauth2_schema)] ) -> Optional[UserUnion]: if not token.credentials: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token') try: payload = jwt.decode(token.credentials, backend.config.SECRET_KEY, algorithms=[algorithm]) user_id = payload.get('sub') if not user_id: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials') if user_id == DEAL_EDITOR or user_id == DEALS_VIEWER: return payload user_id = int(user_id) user: Optional[User] = await session.get(User, user_id) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid credentials') if user.is_deleted or user.is_blocked: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Пользователь заблокирован или удален') return user except JWTError as e: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token') async def authorized_user( user: Annotated[User, Depends(get_current_user)] ): if type(user) is User: return user raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token') async def guest_user(user: Annotated[UserUnion, Depends(get_current_user)]): if (type(user) is User) or (type(user) is dict): return user raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token') async def user_deals_viewer(user: Annotated[UserUnion, Depends(get_current_user)]): if (type(user) is User) or (type(user) is dict and user['sub'] == DEALS_VIEWER): return user raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token') async def user_deal_editor(user: Annotated[UserUnion, Depends(get_current_user)]): if (type(user) is User) or (type(user) is dict and user['sub'] == DEAL_EDITOR): return user raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token') class AuthService(BaseService): @staticmethod def _generate_jwt_token(payload: dict) -> str: return jwt.encode(payload, backend.config.SECRET_KEY, algorithm=algorithm) async def authenticate(self, request: AuthLoginRequest): user: Optional[User] = await self.session.scalar(select(User).where(User.telegram_id == request.id)) if user and (user.is_deleted or user.is_blocked): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Пользователь заблокирован или удален') if not user and request.invite_code: invite_code = await self.session.scalar( select( InviteCode ) .where( InviteCode.code == request.invite_code, InviteCode.is_activated == False ) ) if not invite_code: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Неверный код приглашения') # check if code is expired delta = datetime.now() - invite_code.created_at if delta.seconds >= constants.INVITE_CODE_EXPIRY: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Код приглашения устарел') user = await self.session.get(User, invite_code.user_id) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Пользователь не найден') user.telegram_id = request.id await self.session.flush() invite_code.is_activated = True await self.session.commit() if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Пользователь не найден') payload = { 'sub': str(user.id), 'role': user.role_key, } access_token = self._generate_jwt_token(payload) return AuthLoginResponse(access_token=access_token) def create_deal_guest_token(self, deal_id: int): payload: UserDealEditor = { 'sub': constants.DEAL_EDITOR, 'deal_id': deal_id } return self._generate_jwt_token(payload) def create_client_guest_token(self, client_id: int): payload: UserViewer = { 'sub': constants.DEALS_VIEWER, 'client_id': client_id } return self._generate_jwt_token(payload)