61 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			61 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from typing import Union, Annotated
 | 
						|
 | 
						|
from fastapi import Depends, HTTPException
 | 
						|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
 | 
						|
from jose import jwt, JWTError
 | 
						|
from sqlalchemy import select
 | 
						|
from sqlalchemy.ext.asyncio import AsyncSession
 | 
						|
from starlette import status
 | 
						|
 | 
						|
import backend.config
 | 
						|
import constants
 | 
						|
from backend.session import get_session
 | 
						|
from models import User
 | 
						|
from schemas.auth import *
 | 
						|
from services.base import BaseService
 | 
						|
 | 
						|
oauth2_schema = HTTPBearer()
 | 
						|
algorithm = 'HS256'
 | 
						|
 | 
						|
 | 
						|
async def get_current_user(session: Annotated[AsyncSession, Depends(get_session)],
 | 
						|
                           token: Annotated[HTTPAuthorizationCredentials, Depends(oauth2_schema)]) -> User | None:
 | 
						|
    if not token.credentials:
 | 
						|
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
 | 
						|
    try:
 | 
						|
        payload = jwt.decode(token.credentials, backend.config.SECRET_KEY, algorithms=[algorithm])
 | 
						|
        user_id = payload.get('sub')
 | 
						|
        if not user_id:
 | 
						|
            raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials')
 | 
						|
        user_id = int(user_id)
 | 
						|
 | 
						|
        user = await session.get(User, user_id)
 | 
						|
        if not user:
 | 
						|
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid credentials')
 | 
						|
        return user
 | 
						|
    except JWTError as e:
 | 
						|
        print(e)
 | 
						|
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token')
 | 
						|
 | 
						|
 | 
						|
class AuthService(BaseService):
 | 
						|
    @staticmethod
 | 
						|
    def _generate_jwt_token(user: User) -> str:
 | 
						|
        payload = {
 | 
						|
            'sub': str(user.id)
 | 
						|
        }
 | 
						|
        return jwt.encode(payload, backend.config.SECRET_KEY, algorithm=algorithm)
 | 
						|
 | 
						|
    async def authenticate(self, request: AuthLoginRequest):
 | 
						|
        if request.id not in constants.allowed_telegram_ids:
 | 
						|
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid credentials')
 | 
						|
 | 
						|
        user: Union[User, None] = await self.session.scalar(select(User).where(User.telegram_id == request.id))
 | 
						|
        if not user:
 | 
						|
            user = User(telegram_id=request.id,
 | 
						|
                        is_admin=False)
 | 
						|
            self.session.add(user)
 | 
						|
            await self.session.commit()
 | 
						|
        access_token = self._generate_jwt_token(user)
 | 
						|
        return AuthLoginResponse(access_token=access_token)
 |