Files
Fulfillment-Backend/services/auth.py
2025-02-04 07:46:31 +03:00

93 lines
3.3 KiB
Python

from typing import Union, Annotated
from fastapi import Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from starlette import status
import backend.config
import constants
from backend.session import get_session
from enums.user import UserRole
from models import User
from schemas.auth import *
from services.base import BaseService
oauth2_schema = HTTPBearer()
algorithm = 'HS256'
async def get_current_user(
session: Annotated[AsyncSession, Depends(get_session)],
token: Annotated[HTTPAuthorizationCredentials, Depends(oauth2_schema)]
) -> Union[User, None, dict]:
if not token.credentials:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
try:
payload = jwt.decode(token.credentials, backend.config.SECRET_KEY, algorithms=[algorithm])
user_id = payload.get('sub')
if not user_id:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials')
if user_id == 'guest':
return payload
user_id = int(user_id)
user = await session.get(User, user_id)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid credentials')
return user
except JWTError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
async def authorized_user(
user: Annotated[User, Depends(get_current_user)]
):
if type(user) is User:
return user
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
async def guest_user(user: Annotated[User, Depends(get_current_user)]):
if (type(user) is User) or (type(user) is dict):
return user
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
class AuthService(BaseService):
@staticmethod
def _generate_jwt_token(payload: dict) -> str:
return jwt.encode(payload, backend.config.SECRET_KEY, algorithm=algorithm)
async def authenticate(self, request: AuthLoginRequest):
if request.id not in constants.allowed_telegram_ids:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid credentials')
user: Union[User, None] = await self.session.scalar(select(User).where(User.telegram_id == request.id))
if user and (user.is_deleted or user.is_blocked):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid credentials')
if not user:
user = User(
telegram_id=request.id,
is_admin=False,
role_key=UserRole.user
)
self.session.add(user)
await self.session.commit()
payload = {
'sub': str(user.id),
'role': user.role_key,
}
access_token = self._generate_jwt_token(payload)
return AuthLoginResponse(access_token=access_token)
def create_deal_guest_token(self, deal_id: int):
payload = {
'sub': 'guest',
'deal_id': deal_id
}
return self._generate_jwt_token(payload)