128 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			128 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from typing import Annotated, Union
 | 
						|
 | 
						|
from fastapi import APIRouter, Depends
 | 
						|
from sqlalchemy.ext.asyncio import AsyncSession
 | 
						|
 | 
						|
import utils.dependecies
 | 
						|
from backend.session import get_session
 | 
						|
from schemas.barcode import GetProductBarcodeResponse, GetProductBarcodeRequest
 | 
						|
from schemas.base import PaginationSchema
 | 
						|
from schemas.product import *
 | 
						|
from services.auth import get_current_user
 | 
						|
from services.barcode import BarcodeService
 | 
						|
from services.product import ProductService
 | 
						|
 | 
						|
product_router = APIRouter(
 | 
						|
    prefix="/product",
 | 
						|
    tags=["product"],
 | 
						|
    dependencies=[Depends(get_current_user)]
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
@product_router.post(
 | 
						|
    '/create',
 | 
						|
    response_model=ProductCreateResponse,
 | 
						|
    operation_id='create_product'
 | 
						|
)
 | 
						|
async def create_product(
 | 
						|
        request: ProductCreateRequest,
 | 
						|
        session: Annotated[AsyncSession, Depends(get_session)]
 | 
						|
):
 | 
						|
    return await ProductService(session).create(request)
 | 
						|
 | 
						|
 | 
						|
@product_router.post(
 | 
						|
    '/delete',
 | 
						|
    response_model=ProductDeleteResponse,
 | 
						|
    operation_id='delete_product'
 | 
						|
)
 | 
						|
async def delete_product(
 | 
						|
        request: ProductDeleteRequest,
 | 
						|
        session: Annotated[AsyncSession, Depends(get_session)]
 | 
						|
):
 | 
						|
    return await ProductService(session).delete(request)
 | 
						|
 | 
						|
 | 
						|
@product_router.post(
 | 
						|
    '/update',
 | 
						|
    response_model=ProductUpdateResponse,
 | 
						|
    operation_id='update_product'
 | 
						|
)
 | 
						|
async def delete_product(
 | 
						|
        request: ProductUpdateRequest,
 | 
						|
        session: Annotated[AsyncSession, Depends(get_session)]
 | 
						|
):
 | 
						|
    return await ProductService(session).update(request)
 | 
						|
 | 
						|
 | 
						|
@product_router.get(
 | 
						|
    '/get',
 | 
						|
    response_model=ProductGetResponse,
 | 
						|
    operation_id='get_products_by_client_id'
 | 
						|
)
 | 
						|
async def get_product(
 | 
						|
        client_id: int,
 | 
						|
        pagination: Annotated[PaginationSchema, Depends(utils.dependecies.pagination_parameters)],
 | 
						|
        session: Annotated[AsyncSession, Depends(get_session)]
 | 
						|
):
 | 
						|
    return await ProductService(session).get_by_client_id(client_id, pagination)
 | 
						|
 | 
						|
 | 
						|
@product_router.get('/get-by-id',
 | 
						|
                    response_model=ProductSchema,
 | 
						|
                    operation_id='get_product_by_id')
 | 
						|
async def get_product_by_id(
 | 
						|
        product_id: int,
 | 
						|
        session: Annotated[AsyncSession, Depends(get_session)]
 | 
						|
):
 | 
						|
    return await ProductService(session).get_by_id(product_id)
 | 
						|
 | 
						|
 | 
						|
@product_router.post(
 | 
						|
    '/barcode/add',
 | 
						|
    response_model=ProductAddBarcodeResponse,
 | 
						|
    operation_id='add_product_barcode'
 | 
						|
)
 | 
						|
async def add_product_barcode(
 | 
						|
        request: ProductAddBarcodeRequest,
 | 
						|
        session: Annotated[AsyncSession, Depends(get_session)]
 | 
						|
):
 | 
						|
    return await ProductService(session).add_barcode(request)
 | 
						|
 | 
						|
 | 
						|
@product_router.get(
 | 
						|
    '/barcode/exists',
 | 
						|
    response_model=ProductExistsBarcodeResponse,
 | 
						|
    operation_id='exists_product_barcode'
 | 
						|
)
 | 
						|
async def exists_product_barcode(
 | 
						|
        product_id: int,
 | 
						|
        barcode: str,
 | 
						|
        session: Annotated[AsyncSession, Depends(get_session)]
 | 
						|
):
 | 
						|
    return await ProductService(session).exists_barcode(product_id, barcode)
 | 
						|
 | 
						|
 | 
						|
@product_router.post(
 | 
						|
    '/barcode/generate',
 | 
						|
    response_model=ProductGenerateBarcodeResponse,
 | 
						|
    operation_id='generate_product_barcode'
 | 
						|
)
 | 
						|
async def generate_product_barcode(
 | 
						|
        request: ProductGenerateBarcodeRequest,
 | 
						|
        session: Annotated[AsyncSession, Depends(get_session)]
 | 
						|
):
 | 
						|
    return await ProductService(session).generate_barcode(request)
 | 
						|
 | 
						|
 | 
						|
@product_router.post(
 | 
						|
    '/barcode/get',
 | 
						|
    response_model=GetProductBarcodeResponse,
 | 
						|
    operation_id='get_product_barcode'
 | 
						|
)
 | 
						|
async def get_product_barcode(
 | 
						|
        request: GetProductBarcodeRequest,
 | 
						|
        session: Annotated[AsyncSession, Depends(get_session)]
 | 
						|
):
 | 
						|
    return await BarcodeService(session).get_barcode(request)
 |