from typing import Union, Annotated from fastapi import Depends, HTTPException from fastapi.security import OAuth2PasswordBearer, HTTPBearer, HTTPAuthorizationCredentials from jose import jwt, JWTError from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from starlette import status import backend.config from backend.session import get_session from models import User from services.base import BaseService from schemas.auth import * oauth2_schema = HTTPBearer() algorithm = 'HS256' async def get_current_user(session: Annotated[AsyncSession, Depends(get_session)], token: Annotated[HTTPAuthorizationCredentials, Depends(oauth2_schema)]) -> User | None: if not token.credentials: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token') try: payload = jwt.decode(token.credentials, backend.config.SECRET_KEY, algorithms=[algorithm]) user_id = payload.get('sub') if not user_id: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail='Invalid credentials') user_id = int(user_id) user = await session.get(User, user_id) if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid credentials') return user except JWTError as e: print(e) raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail='Invalid token') class AuthService(BaseService): @staticmethod def _generate_jwt_token(user: User) -> str: payload = { 'sub': str(user.id) } return jwt.encode(payload, backend.config.SECRET_KEY, algorithm=algorithm) async def authenticate(self, request: AuthLoginRequest): user: Union[User, None] = await self.session.scalar(select(User).where(User.telegram_id == request.id)) if not user: user = User(telegram_id=request.id, is_admin=False) self.session.add(user) await self.session.commit() access_token = self._generate_jwt_token(user) return AuthLoginResponse(access_token=access_token)