114 lines
3.1 KiB
Python
114 lines
3.1 KiB
Python
from typing import Annotated, Union
|
|
|
|
from fastapi import APIRouter, Depends
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
import utils.dependecies
|
|
from backend.session import get_session
|
|
from schemas.base import PaginationSchema
|
|
from schemas.product import *
|
|
from services.auth import get_current_user
|
|
from services.product import ProductService
|
|
|
|
product_router = APIRouter(
|
|
prefix="/product",
|
|
tags=["product"],
|
|
dependencies=[Depends(get_current_user)]
|
|
)
|
|
|
|
|
|
@product_router.post(
|
|
'/create',
|
|
response_model=ProductCreateResponse,
|
|
operation_id='create_product'
|
|
)
|
|
async def create_product(
|
|
request: ProductCreateRequest,
|
|
session: Annotated[AsyncSession, Depends(get_session)]
|
|
):
|
|
return await ProductService(session).create(request)
|
|
|
|
|
|
@product_router.post(
|
|
'/delete',
|
|
response_model=ProductDeleteResponse,
|
|
operation_id='delete_product'
|
|
)
|
|
async def delete_product(
|
|
request: ProductDeleteRequest,
|
|
session: Annotated[AsyncSession, Depends(get_session)]
|
|
):
|
|
return await ProductService(session).delete(request)
|
|
|
|
|
|
@product_router.post(
|
|
'/update',
|
|
response_model=ProductUpdateResponse,
|
|
operation_id='update_product'
|
|
)
|
|
async def delete_product(
|
|
request: ProductUpdateRequest,
|
|
session: Annotated[AsyncSession, Depends(get_session)]
|
|
):
|
|
return await ProductService(session).update(request)
|
|
|
|
|
|
@product_router.get(
|
|
'/get',
|
|
response_model=ProductGetResponse,
|
|
operation_id='get_products_by_client_id'
|
|
)
|
|
async def get_product(
|
|
client_id: int,
|
|
pagination: Annotated[PaginationSchema, Depends(utils.dependecies.pagination_parameters)],
|
|
session: Annotated[AsyncSession, Depends(get_session)]
|
|
):
|
|
return await ProductService(session).get_by_client_id(client_id, pagination)
|
|
|
|
|
|
@product_router.get('/get-by-id',
|
|
response_model=ProductSchema,
|
|
operation_id='get_product_by_id')
|
|
async def get_product_by_id(
|
|
product_id: int,
|
|
session: Annotated[AsyncSession, Depends(get_session)]
|
|
):
|
|
return await ProductService(session).get_by_id(product_id)
|
|
|
|
|
|
@product_router.post(
|
|
'/barcode/add',
|
|
response_model=ProductAddBarcodeResponse,
|
|
operation_id='add_product_barcode'
|
|
)
|
|
async def add_product_barcode(
|
|
request: ProductAddBarcodeRequest,
|
|
session: Annotated[AsyncSession, Depends(get_session)]
|
|
):
|
|
return await ProductService(session).add_barcode(request)
|
|
|
|
|
|
@product_router.get(
|
|
'/barcode/exists',
|
|
response_model=ProductExistsBarcodeResponse,
|
|
operation_id='exists_product_barcode'
|
|
)
|
|
async def exists_product_barcode(
|
|
product_id: int,
|
|
barcode: str,
|
|
session: Annotated[AsyncSession, Depends(get_session)]
|
|
):
|
|
return await ProductService(session).exists_barcode(product_id, barcode)
|
|
|
|
|
|
@product_router.post(
|
|
'/barcode/generate',
|
|
response_model=ProductGenerateBarcodeResponse,
|
|
operation_id='generate_product_barcode'
|
|
)
|
|
async def generate_product_barcode(
|
|
request: ProductGenerateBarcodeRequest,
|
|
session: Annotated[AsyncSession, Depends(get_session)]
|
|
):
|
|
return await ProductService(session).generate_barcode(request)
|