diff --git a/.gitignore b/.gitignore index e5f3eb2..367b00b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .venv/ .idea/ .env +.linux_venv alembic/versions/ \ No newline at end of file diff --git a/backend/config.py b/backend/config.py index 907b938..3e0f027 100644 --- a/backend/config.py +++ b/backend/config.py @@ -8,3 +8,6 @@ PG_PASSWORD = os.environ.get('PG_PASSWORD') PG_PORT = os.environ.get('PG_PORT') PG_HOST = os.environ.get('PG_HOST') PG_DATABASE = os.environ.get('PG_DATABASE') + +CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL') +CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND') diff --git a/background/__init__.py b/background/__init__.py new file mode 100644 index 0000000..645aa30 --- /dev/null +++ b/background/__init__.py @@ -0,0 +1,2 @@ +from .worker import celery +from .tasks import * diff --git a/background/tasks.py b/background/tasks.py new file mode 100644 index 0000000..421fbcd --- /dev/null +++ b/background/tasks.py @@ -0,0 +1,9 @@ +import json + +from background import celery + + +@celery.task(name='test') +def test_task(): + with open('test.json', 'a') as tf: + tf.write(json.dumps({'ok': True})) diff --git a/background/worker.py b/background/worker.py new file mode 100644 index 0000000..1a28bbc --- /dev/null +++ b/background/worker.py @@ -0,0 +1,8 @@ +from celery import Celery +from backend.config import CELERY_RESULT_BACKEND, CELERY_BROKER_URL + +celery = Celery( + __name__, + broker=CELERY_BROKER_URL, + backend=CELERY_RESULT_BACKEND +) diff --git a/database/sipro/__init__.py b/database/sipro/__init__.py index aed4fa3..19b0972 100644 --- a/database/sipro/__init__.py +++ b/database/sipro/__init__.py @@ -1 +1,2 @@ from .models import * +from .enums import * diff --git a/database/sipro/enums/__init__.py b/database/sipro/enums/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/database/sipro/enums/general.py b/database/sipro/enums/general.py new file mode 100644 index 0000000..5452bfb --- /dev/null +++ b/database/sipro/enums/general.py @@ -0,0 +1,8 @@ +from enum import unique, IntEnum + + +@unique +class BaseMarketplace(IntEnum): + WILDBERRIES = 0 + OZON = 1 + YANDEX_MARKET = 2 diff --git a/database/sipro/enums/product.py b/database/sipro/enums/product.py new file mode 100644 index 0000000..f8bcedd --- /dev/null +++ b/database/sipro/enums/product.py @@ -0,0 +1,12 @@ +from enum import IntEnum, unique + + +@unique +class ProductRelationType(IntEnum): + PRODUCT_SAME_COLOR = 2 + RELATION_FIRST = 3 + RELATION_SECOND = 4 + MAIN_PRODUCT = 5 + PRODUCT_DIFFERENT_SIZE = 6 + SAME_MIX = 7 + SAME_PRODUCT = 8 diff --git a/database/sipro/models/general.py b/database/sipro/models/general.py index e151e6e..f25db59 100644 --- a/database/sipro/models/general.py +++ b/database/sipro/models/general.py @@ -44,7 +44,7 @@ class Marketplace(BaseSiproModel): id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column() auth_data: Mapped[str] = mapped_column() - + base_marketplace: Mapped[int] = mapped_column() sell_mixes: Mapped[bool] = mapped_column() sell_blocks: Mapped[bool] = mapped_column() sell_warehouse_products: Mapped[bool] = mapped_column() diff --git a/database/stocks/models/general.py b/database/stocks/models/general.py index 67b3109..8fd6f10 100644 --- a/database/stocks/models/general.py +++ b/database/stocks/models/general.py @@ -7,4 +7,3 @@ class DailyStock(BaseStocksModel): __tablename__ = 'daily_stocks' product_id: Mapped[int] = mapped_column(primary_key=True) sold_today: Mapped[int] = mapped_column() - test: Mapped[bool] = mapped_column() diff --git a/limiter/__init__.py b/limiter/__init__.py new file mode 100644 index 0000000..1888d14 --- /dev/null +++ b/limiter/__init__.py @@ -0,0 +1 @@ +from .batch_limiter import BatchLimiter diff --git a/limiter/batch_limiter.py b/limiter/batch_limiter.py new file mode 100644 index 0000000..bf205e9 --- /dev/null +++ b/limiter/batch_limiter.py @@ -0,0 +1,29 @@ +import asyncio +from datetime import datetime + + +class BatchLimiter: + def __init__(self, max_requests, period): + self.max_requests = max_requests + self.period = period + self.current_requests = 0 + self.start_time = None + self.lock = asyncio.Lock() + + async def acquire(self): + async with self.lock: + if self.current_requests == 0: + self.start_time = datetime.now() + + if self.current_requests < self.max_requests: + self.current_requests += 1 + return + + elapsed_time = (datetime.now() - self.start_time).total_seconds() + if elapsed_time < self.period: + await asyncio.sleep(self.period - elapsed_time) + self.current_requests = 1 + self.start_time = datetime.now() + else: + self.current_requests = 1 + self.start_time = datetime.now() diff --git a/main.py b/main.py index d5baace..55bc5e4 100644 --- a/main.py +++ b/main.py @@ -1,302 +1,23 @@ from typing import Annotated -from fastapi import FastAPI, Depends -from sqlalchemy import select, func, and_, cast, String +from celery.result import AsyncResult +from fastapi import FastAPI, Depends, Body +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload +from starlette.responses import JSONResponse from backend.session import get_session +from database import DailyStock from database.sipro import * +from queries.general import get_stocks_data +from background.tasks import * +from updaters.stocks_updater import StockUpdate app = FastAPI() -def get_marketplace_suppliers_and_company_warehouses(marketplace: Marketplace): - company = marketplace.company - suppliers = set() - company_warehouses = set() - for warehouse in marketplace.warehouses: - for supplier in warehouse.suppliers: - if supplier.is_pseudo: - continue - suppliers.add(supplier) - company_warehouses.update(warehouse.company_warehouses) - if marketplace.sell_warehouse_products: - company_warehouse = company.warehouse - if company_warehouse and not company.is_denco: - company_warehouses.add(company_warehouse) - return suppliers, company_warehouses - - -# def get_stocks_data(marketplace: Marketplace): -# suppliers, company_warehouses = get_marketplace_suppliers_and_company_warehouses(marketplace) -# supplier_ids = [supplier.id for supplier in suppliers] -# company_warehouse_ids = [warehouse.id for warehouse in company_warehouses] -# -# sell_mixes: bool = marketplace.sell_mixes -# sell_blocks: bool = marketplace.sell_blocks -# sell_warehouse_products: bool = marketplace.sell_warehouse_products -# sell_from_price: int = marketplace.sell_from_price -# -# stmt = ( -# select( -# MarketplaceProduct -# ) -# .join( -# Product -# ) -# .options( -# joinedload(MarketplaceProduct.product) -# ) -# .where( -# MarketplaceProduct.marketplace_id == marketplace.id -# ) -# ) -# supplier_stock_subquery = ( -# select( -# func.sum(SupplierProduct.supplier_stock).label('supplier_stock'), -# Product.id.label('product_id') -# ) -# .join( -# Product -# ) -# .where( -# SupplierProduct.supplier_id.in_(supplier_ids) -# ) -# .group_by( -# Product.id -# ) -# .subquery() -# ) -# -# warehouse_stock_subquery = ( -# select( -# func.count(CompanyWarehouseProduct.is_sold).label('warehouse_stock'), -# CompanyWarehouseProduct.product_id.label('product_id') -# ) -# .where( -# CompanyWarehouseProduct.is_sold == False, -# CompanyWarehouseProduct.company_warehouse_id.in_(company_warehouse_ids) -# ) -# .group_by( -# CompanyWarehouseProduct.product_id -# ) -# .subquery() -# ) - -async def get_stocks_data(session: AsyncSession, marketplace: Marketplace): - company = marketplace.company - suppliers, company_warehouses = get_marketplace_suppliers_and_company_warehouses(marketplace) - supplier_ids = [supplier.id for supplier in suppliers] - company_warehouse_ids = [warehouse.id for warehouse in company_warehouses] - - sell_mixes: bool = marketplace.sell_mixes - sell_blocks: bool = marketplace.sell_blocks - sell_warehouse_products: bool = marketplace.sell_warehouse_products - sell_from_price: int = marketplace.sell_from_price - - supplier_stock_subquery = ( - select( - func.sum(SupplierProduct.supplier_stock).label('supplier_stock'), - SupplierProduct.product_id.label('product_id') - ) - .select_from(SupplierProduct) - .join(Product) - .where(SupplierProduct.supplier_id.in_(supplier_ids)) - .group_by(SupplierProduct.product_id) - .subquery() - ) - - warehouse_stock_subquery = ( - select( - func.count(CompanyWarehouseProduct.is_sold).label('warehouse_stock'), - CompanyWarehouseProduct.product_id.label('product_id') - ) - .select_from(CompanyWarehouseProduct) - .where( - CompanyWarehouseProduct.is_sold == False, - CompanyWarehouseProduct.company_warehouse_id.in_(company_warehouse_ids) - ) - .group_by(CompanyWarehouseProduct.product_id) - .subquery() - ) - - mix_stock_first_subquery = ( - select( - func.sum(SupplierProduct.supplier_stock).label('master_stock'), - SupplierProduct.product_id.label('product_id') - ) - .select_from(SupplierProduct) - .where(SupplierProduct.supplier_id.in_(supplier_ids)) - .group_by(SupplierProduct.product_id) - .subquery() - ) - - mix_stock_full_subquery = ( - select( - func.min(SupplierProduct.in_block).label('mix_stock'), - Product.id.label('product_id') - ) - .select_from(Product) - .join(SupplierProduct) - .join(ProductRelation, Product.id == ProductRelation.slave_product_id) - .join(mix_stock_first_subquery, mix_stock_first_subquery.c.product_id == ProductRelation.master_product_id) - .where( - ProductRelation.relation_type == 5, - mix_stock_first_subquery.c.master_stock > 0 - ) - .group_by(Product.id) - .subquery() - ) - - is_master_first_subquery = ( - select( - ProductRelation.master_product_id, - (func.count(ProductRelation.master_product_id) > 0).label('is_master') - ) - .where( - ProductRelation.relation_type == 5 - ) - .group_by(ProductRelation.master_product_id) - .subquery() - ) - - is_master_subquery = ( - select( - Product.id.label('product_id'), - func.coalesce(is_master_first_subquery.c.is_master, False).label('is_master') - ) - .select_from( - Product - ) - .outerjoin( - is_master_first_subquery, - Product.id == is_master_first_subquery.c.master_product_id - ) - .subquery() - ) - - in_block_subquery = ( - select( - Product.id.label('product_id'), - func.min(SupplierProduct.in_block).label('in_block_value') - ) - .select_from(Product) - .join(SupplierProduct) - .where( - SupplierProduct.supplier_id.in_(supplier_ids), - SupplierProduct.supplier_stock > 0 - ) - .group_by(Product.id) - .subquery() - ) - slaves_stock_first_subquery = ( - select( - ProductRelation.master_product_id.label('product_id'), - func.sum(SupplierProduct.supplier_stock).label('slaves_stock') - ) - .select_from(ProductRelation) - .join(SupplierProduct, and_( - ProductRelation.slave_product_id == SupplierProduct.product_id, - ProductRelation.relation_type == 7 - )) - .where(SupplierProduct.supplier_id.in_(supplier_ids)) - .group_by(ProductRelation.master_product_id) - .subquery() - ) - - slaves_stock_subquery = ( - select( - Product.id.label('product_id'), - slaves_stock_first_subquery.c.slaves_stock.label('slaves_stock') - ) - .select_from(Product) - .join(slaves_stock_first_subquery, slaves_stock_first_subquery.c.product_id == Product.id) - .subquery() - ) - - stmt = ( - select( - MarketplaceProduct, - func.coalesce(Product.article, cast(Product.denco_article, String)).label('denco_article'), - MarketplaceProduct.mp_price_bought.label('price_purchase'), - supplier_stock_subquery.c.supplier_stock.label('supplier_stock'), - warehouse_stock_subquery.c.warehouse_stock.label('warehouse_stock'), - mix_stock_full_subquery.c.mix_stock.label('mix_stock'), - in_block_subquery.c.in_block_value.label('in_block_value'), - is_master_subquery.c.is_master.label('is_master'), - slaves_stock_subquery.c.slaves_stock.label('slaves_stock'), - MarketplaceProduct.price_recommended.label('price_recommended'), - MarketplaceProduct.is_archived.label('is_archived') - ) - .select_from(MarketplaceProduct) - .join(Product) - .options(joinedload(MarketplaceProduct.product)) - .where(MarketplaceProduct.marketplace_id == marketplace.id) - .outerjoin(supplier_stock_subquery, supplier_stock_subquery.c.product_id == MarketplaceProduct.product_id) - .outerjoin(warehouse_stock_subquery, warehouse_stock_subquery.c.product_id == MarketplaceProduct.product_id) - .outerjoin(mix_stock_full_subquery, mix_stock_full_subquery.c.product_id == MarketplaceProduct.product_id) - .outerjoin(in_block_subquery, in_block_subquery.c.product_id == MarketplaceProduct.product_id) - .outerjoin(is_master_subquery, is_master_subquery.c.product_id == MarketplaceProduct.product_id) - .outerjoin(slaves_stock_subquery, slaves_stock_subquery.c.product_id == MarketplaceProduct.product_id) - ) - print(stmt.compile(compile_kwargs={ - 'literal_binds': True - })) - - result = await session.execute(stmt) - marketplace_products = result.all() - result = [] - for marketplace_product, denco_article, price_purchase, supplier_stock, warehouse_stock, mix_stock, in_block_value, is_master, slaves_stock, price_recommended, is_archived in marketplace_products: - if is_archived or (sell_from_price > price_recommended): - result.append({ - 'denco_article': denco_article, - 'full_stock': 0, - # 'marketplace_product': marketplace_product, - }) - continue - - is_mix = mix_stock is not None - - in_block_value = in_block_value or 1 - price_purchase = price_purchase or 0 - supplier_stock = supplier_stock or 0 - warehouse_stock = warehouse_stock or 0 - mix_stock = mix_stock or 0 - slaves_stock = slaves_stock or 0 - - if not sell_warehouse_products: - warehouse_stock = 0 - - if all([is_mix, slaves_stock > 0]): - mix_stock = 0 - balance_limit = price_purchase > company.balance - - if balance_limit: - supplier_stock = 0 - full_stock = supplier_stock + warehouse_stock - - if all([not is_mix, not sell_blocks, in_block_value > 1]): - full_stock = warehouse_stock - if sell_mixes and (not balance_limit): - full_stock += mix_stock - if (not sell_mixes) and is_master: - full_stock = warehouse_stock - if (not sell_mixes) and is_mix: - full_stock = warehouse_stock - if 45 > company.balance: - full_stock = 0 - full_stock = max([0, full_stock]) - - result.append({ - 'denco_article': denco_article, - 'full_stock': full_stock, - # 'marketplace_product': marketplace_product, - }) - - return result - - @app.get("/") async def root( session: Annotated[AsyncSession, Depends(get_session)], @@ -312,6 +33,19 @@ async def root( return {"message": data} -@app.get("/hello/{name}") -async def say_hello(name: str): - return {"message": f"Hello {name}"} +@app.post("/tasks", status_code=201) +def run_task(payload=Body(...)): + task_type = payload["type"] + task = test_task.delay() + return JSONResponse({"task_id": task.id}) + + +@app.get("/tasks/{task_id}") +def get_status(task_id): + task_result = AsyncResult(task_id) + result = { + "task_id": task_id, + "task_status": task_result.status, + "task_result": task_result.result + } + return JSONResponse(result) diff --git a/marketplaces/__init__.py b/marketplaces/__init__.py new file mode 100644 index 0000000..1f797af --- /dev/null +++ b/marketplaces/__init__.py @@ -0,0 +1,3 @@ +from .ozon import OzonMarketplace +from .wildberries import WildberriesMarketplace +from .factory import MarketplaceFactory diff --git a/marketplaces/base.py b/marketplaces/base.py new file mode 100644 index 0000000..f0254b4 --- /dev/null +++ b/marketplaces/base.py @@ -0,0 +1,37 @@ +from abc import ABC, abstractmethod +from typing import Literal, Union + +import aiohttp +from aiohttp import ClientResponse + +from database import Marketplace + + +class BaseJsonMarketplace(ABC): + @abstractmethod + def __init__(self, marketplace: Marketplace): + pass + + @abstractmethod + async def update_stocks(self, data: Union[list, dict]): + pass + + @abstractmethod + def get_headers(self): + pass + + @abstractmethod + @property + def api_url(self): + pass + + async def _method(self, http_method: Literal['POST', 'GET', 'PATCH', 'PUT', 'DELETE'], + method: str, + data: dict) -> ClientResponse: + async with aiohttp.ClientSession as session: + async with session.request(http_method, + f'{self.api_url}{method}', + json=data, + headers=self.get_headers() + ) as response: + return response diff --git a/marketplaces/factory.py b/marketplaces/factory.py new file mode 100644 index 0000000..57ffb40 --- /dev/null +++ b/marketplaces/factory.py @@ -0,0 +1,19 @@ +from typing import Union + +from database import Marketplace +from database.sipro.enums.general import BaseMarketplace +from .wildberries import WildberriesMarketplace +from .ozon import OzonMarketplace + + +class MarketplaceFactory: + @staticmethod + def get_marketplace(marketplace: Marketplace) -> Union[ + WildberriesMarketplace, + OzonMarketplace, + ]: + match marketplace.base_marketplace: + case BaseMarketplace.OZON: + return OzonMarketplace(marketplace) + case BaseMarketplace.WILDBERRIES: + return WildberriesMarketplace(marketplace) diff --git a/marketplaces/ozon.py b/marketplaces/ozon.py new file mode 100644 index 0000000..1604254 --- /dev/null +++ b/marketplaces/ozon.py @@ -0,0 +1,54 @@ +import json +import logging +from typing import Union + +from aiolimiter import AsyncLimiter +from asynciolimiter import StrictLimiter + +import utils +from database import Marketplace +from limiter import BatchLimiter +from marketplaces.base import BaseJsonMarketplace + + +class OzonMarketplace(BaseJsonMarketplace): + + def __init__(self, marketplace: Marketplace): + self.marketplace = marketplace + auth_data = json.loads(marketplace.auth_data) + + self.headers = { + 'Client-Id': auth_data.get('clientId'), + 'Api-Key': auth_data.get('clientToken') + } + + def get_headers(self): + return self.headers + + def api_url(self): + return 'https://api-seller.ozon.ru' + + async def update_stocks(self, data: Union[list, dict]): + if type(data) is not list: + return + max_stocks = 100 + chunks = utils.chunk_list(data, max_stocks) + limiter = BatchLimiter(max_requests=80, + period=60) + for chunk in chunks: + try: + await limiter.acquire() + response = await self._method('POST', + '/v2/products/stocks', + data=chunk) + response = await response.json() + # response = await + error_message = response.get('message') + error_code = response.get('code') + if error_message: + logging.warning( + f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})') + break + except Exception as e: + logging.error( + f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') diff --git a/marketplaces/wildberries.py b/marketplaces/wildberries.py new file mode 100644 index 0000000..dac4bf7 --- /dev/null +++ b/marketplaces/wildberries.py @@ -0,0 +1,49 @@ +import json +import logging +from typing import Union + +import utils +from database import Marketplace +from limiter import BatchLimiter +from marketplaces.base import BaseJsonMarketplace + + +class WildberriesMarketplace(BaseJsonMarketplace): + def __init__(self, marketplace: Marketplace): + self.marketplace = marketplace + auth_data = json.loads(marketplace.auth_data) + + self.headers = { + 'Authorization': auth_data.get('token'), + 'Content-Type': 'application/json' + } + + def get_headers(self): + return self.headers + + def api_url(self): + return 'https://suppliers-api.wildberries.ru' + + async def update_stocks(self, data: Union[list, dict]): + if type(data) is not list: + return + max_stocks = 1000 + chunks = utils.chunk_list(data, max_stocks) + limiter = BatchLimiter(max_requests=300, + period=60) + for chunk in chunks: + try: + await limiter.acquire() + response = await self._method('PUT', + '/api/v3/stocks/{warehouseId}', + chunk) + if response.status != 204: + response = await response.json() + error_message = response.get('message') + error_code = response.get('code') + logging.warning( + f'Error occurred when sending stocks to [{self.marketplace.id}]: {error_message} ({error_code})') + break + except Exception as e: + logging.error( + f'Exception occurred while sending stocks to marketplace ID [{self.marketplace.id}]: {str(e)}') diff --git a/queries/general.py b/queries/general.py new file mode 100644 index 0000000..ce5ec82 --- /dev/null +++ b/queries/general.py @@ -0,0 +1,337 @@ +from typing import Union + +from sqlalchemy import select, func, and_, cast, String, case, or_ +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import joinedload + +from database import DailyStock +from database.sipro import * +from database.sipro.enums.product import ProductRelationType + + +def get_marketplace_suppliers_and_company_warehouses(marketplace: Marketplace): + company = marketplace.company + suppliers = set() + company_warehouses = set() + for warehouse in marketplace.warehouses: + for supplier in warehouse.suppliers: + if supplier.is_pseudo: + continue + suppliers.add(supplier) + company_warehouses.update(warehouse.company_warehouses) + if marketplace.sell_warehouse_products: + company_warehouse = company.warehouse + if company_warehouse and not company.is_denco: + company_warehouses.add(company_warehouse) + return list(suppliers), list(company_warehouses) + + +async def get_stocks_data( + session: AsyncSession, + marketplace: Marketplace, + product_ids: Union[list[int], None] = None +): + if not product_ids: + product_ids = [] + company = marketplace.company + suppliers, company_warehouses = get_marketplace_suppliers_and_company_warehouses(marketplace) + supplier_ids = [supplier.id for supplier in suppliers] + company_warehouse_ids = [warehouse.id for warehouse in company_warehouses] + + sell_mixes: bool = marketplace.sell_mixes + sell_blocks: bool = marketplace.sell_blocks + sell_warehouse_products: bool = marketplace.sell_warehouse_products + sell_from_price: int = marketplace.sell_from_price + + supplier_stock_subquery = ( + select( + func.greatest( + func.sum(SupplierProduct.supplier_stock) - func.coalesce(DailyStock.sold_today, 0), + 0 + ) + .label('supplier_stock'), + SupplierProduct.product_id.label('product_id') + ) + .select_from( + SupplierProduct + ) + .join( + Product + ) + .outerjoin( + DailyStock, + DailyStock.product_id == SupplierProduct.product_id + ) + .where( + SupplierProduct.supplier_id.in_(supplier_ids) + ) + .group_by( + SupplierProduct.product_id + ) + .subquery() + ) + + warehouse_stock_subquery = ( + select( + func.count(CompanyWarehouseProduct.is_sold).label('warehouse_stock'), + CompanyWarehouseProduct.product_id.label('product_id') + ) + .select_from( + CompanyWarehouseProduct + ) + .where( + CompanyWarehouseProduct.is_sold == False, + CompanyWarehouseProduct.company_warehouse_id.in_(company_warehouse_ids) + ) + .group_by( + CompanyWarehouseProduct.product_id + ) + .subquery() + ) + + mix_stock_first_subquery = ( + select( + func.sum(SupplierProduct.supplier_stock).label('master_stock'), + SupplierProduct.product_id.label('product_id') + ) + .select_from( + SupplierProduct + ) + .where( + SupplierProduct.supplier_id.in_(supplier_ids) + ) + .group_by( + SupplierProduct.product_id + ) + .subquery() + ) + + mix_stock_full_subquery = ( + select( + func.min(SupplierProduct.in_block).label('mix_stock'), + Product.id.label('product_id') + ) + .select_from( + Product + ) + .join( + SupplierProduct + ) + .join( + ProductRelation, + Product.id == ProductRelation.slave_product_id + ) + .join( + mix_stock_first_subquery, + mix_stock_first_subquery.c.product_id == ProductRelation.master_product_id + ) + .where( + ProductRelation.relation_type == ProductRelationType.MAIN_PRODUCT, + mix_stock_first_subquery.c.master_stock > 0 + ) + .group_by( + Product.id + ) + .subquery() + ) + + is_master_first_subquery = ( + select( + ProductRelation.master_product_id, + (func.count(ProductRelation.master_product_id) > 0).label('is_master') + ) + .where( + ProductRelation.relation_type == ProductRelationType.MAIN_PRODUCT + ) + .group_by( + ProductRelation.master_product_id + ) + .subquery() + ) + + is_master_subquery = ( + select( + Product.id.label('product_id'), + func.coalesce(is_master_first_subquery.c.is_master, False).label('is_master') + ) + .select_from( + Product + ) + .outerjoin( + is_master_first_subquery, + Product.id == is_master_first_subquery.c.master_product_id + ) + .subquery() + ) + + in_block_subquery = ( + select( + Product.id.label('product_id'), + func.min(SupplierProduct.in_block).label('in_block_value') + ) + .select_from( + Product + ) + .join( + SupplierProduct + ) + .where( + SupplierProduct.supplier_id.in_(supplier_ids), + SupplierProduct.supplier_stock > 0 + ) + .group_by( + Product.id + ) + .subquery() + ) + slaves_stock_first_subquery = ( + select( + ProductRelation.master_product_id.label('product_id'), + func.sum(SupplierProduct.supplier_stock).label('slaves_stock') + ) + .select_from( + ProductRelation + ) + .join( + SupplierProduct, + and_( + ProductRelation.slave_product_id == SupplierProduct.product_id, + ProductRelation.relation_type == ProductRelationType.SAME_MIX + ) + ) + .where( + SupplierProduct.supplier_id.in_( + supplier_ids + ) + ) + .group_by( + ProductRelation.master_product_id + ) + .subquery() + ) + + slaves_stock_subquery = ( + select( + Product.id.label('product_id'), + slaves_stock_first_subquery.c.slaves_stock.label('slaves_stock') + ) + .select_from( + Product + ) + .join( + slaves_stock_first_subquery, + slaves_stock_first_subquery.c.product_id == Product.id + ) + .subquery() + ) + + stmt = ( + select( + MarketplaceProduct, + func.coalesce(Product.article, cast(Product.denco_article, String)).label('denco_article'), + func.coalesce(MarketplaceProduct.mp_price_bought, 0).label('price_purchase'), + func.coalesce(supplier_stock_subquery.c.supplier_stock, 0).label('supplier_stock'), + case( + ( + sell_warehouse_products, + func.coalesce(warehouse_stock_subquery.c.warehouse_stock, 0) + ), + else_=0) + .label('warehouse_stock'), + func.coalesce(mix_stock_full_subquery.c.mix_stock, 0).label('mix_stock'), + func.coalesce(in_block_subquery.c.in_block_value, 1).label('in_block_value'), + is_master_subquery.c.is_master.label('is_master'), + func.coalesce(slaves_stock_subquery.c.slaves_stock, 0).label('slaves_stock'), + MarketplaceProduct.price_recommended.label('price_recommended'), + MarketplaceProduct.is_archived.label('is_archived') + ) + .select_from( + MarketplaceProduct + ) + .join( + Product + ) + .options( + joinedload(MarketplaceProduct.product) + ) + .where( + MarketplaceProduct.marketplace_id == marketplace.id, + or_( + len(product_ids) == 0, + MarketplaceProduct.product_id.in_(product_ids) + ) + ) + .outerjoin( + supplier_stock_subquery, + supplier_stock_subquery.c.product_id == MarketplaceProduct.product_id + ) + .outerjoin( + warehouse_stock_subquery, + warehouse_stock_subquery.c.product_id == MarketplaceProduct.product_id + ) + .outerjoin( + mix_stock_full_subquery, + mix_stock_full_subquery.c.product_id == MarketplaceProduct.product_id + ) + .outerjoin( + in_block_subquery, + in_block_subquery.c.product_id == MarketplaceProduct.product_id + ) + .outerjoin( + is_master_subquery, + is_master_subquery.c.product_id == MarketplaceProduct.product_id + ) + .outerjoin( + slaves_stock_subquery, + slaves_stock_subquery.c.product_id == MarketplaceProduct.product_id + ) + ) + result = await session.execute(stmt) + marketplace_products = result.all() + result = [] + for (marketplace_product, + denco_article, + price_purchase, + supplier_stock, + warehouse_stock, + mix_stock, + in_block_value, + is_master, + slaves_stock, + price_recommended, + is_archived) in marketplace_products: + if is_archived or (sell_from_price > price_recommended): + result.append({ + 'denco_article': denco_article, + 'full_stock': 0, + 'marketplace_product': marketplace_product, + }) + continue + is_mix = mix_stock is not None + if all([is_mix, slaves_stock > 0]): + mix_stock = 0 + balance_limit = price_purchase > company.balance + + if balance_limit: + supplier_stock = 0 + full_stock = supplier_stock + warehouse_stock + + if all([not is_mix, not sell_blocks, in_block_value > 1]): + full_stock = warehouse_stock + if sell_mixes and (not balance_limit): + full_stock += mix_stock + if (not sell_mixes) and is_master: + full_stock = warehouse_stock + if (not sell_mixes) and is_mix: + full_stock = warehouse_stock + if 45 > company.balance: + full_stock = 0 + full_stock = max([0, full_stock]) + + result.append({ + 'denco_article': denco_article, + 'full_stock': full_stock, + 'marketplace_product': marketplace_product, + }) + + return result diff --git a/requirements.txt b/requirements.txt index fbc95c2..4fe2c9a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,4 +10,4 @@ alembic python-dotenv aiohttp aiohttp[speedups] -celery[redis] \ No newline at end of file +celery[redis] diff --git a/start_celery.sh b/start_celery.sh new file mode 100644 index 0000000..da59513 --- /dev/null +++ b/start_celery.sh @@ -0,0 +1 @@ +celery -A background.celery worker --loglevel=info diff --git a/test.py b/test.py index 6aad900..68a4aa6 100644 --- a/test.py +++ b/test.py @@ -1,3 +1,7 @@ -from alembic.env import run_migrations_offline +a = [ + (1, [1, 4, 5, ]), + (2, [1, 2, 3, 4, 5, ]), +] -run_migrations_offline() \ No newline at end of file +updates_list = sorted(a, key=lambda x: x[1], reverse=True) +print(updates_list) diff --git a/updaters/__init__.py b/updaters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/updaters/base.py b/updaters/base.py new file mode 100644 index 0000000..a4feed9 --- /dev/null +++ b/updaters/base.py @@ -0,0 +1,15 @@ +from abc import ABC, abstractmethod +from typing import List + +from database import Marketplace +from updaters.stocks_updater import StockUpdate + + +class BaseMarketplaceUpdater(ABC): + @abstractmethod + def __init__(self, marketplace: Marketplace): + pass + + @abstractmethod + async def update(self, updates: List[StockUpdate]): + pass diff --git a/updaters/ozon_updater.py b/updaters/ozon_updater.py new file mode 100644 index 0000000..50edd94 --- /dev/null +++ b/updaters/ozon_updater.py @@ -0,0 +1,14 @@ +from typing import List + +from database import Marketplace +from marketplaces import MarketplaceFactory, OzonMarketplace +from updaters.base import BaseMarketplaceUpdater +from updaters.stocks_updater import StockUpdate + + +class OzonUpdater(BaseMarketplaceUpdater): + def __init__(self, marketplace: Marketplace): + self.ozon_marketplace: OzonMarketplace = MarketplaceFactory.get_marketplace(marketplace) + + async def update(self, updates: List[StockUpdate]): + pass diff --git a/updaters/stocks_updater.py b/updaters/stocks_updater.py new file mode 100644 index 0000000..ae48628 --- /dev/null +++ b/updaters/stocks_updater.py @@ -0,0 +1,83 @@ +from collections import defaultdict +from dataclasses import dataclass +from enum import unique, IntEnum +from typing import List, Union + +from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert +from sqlalchemy.ext.asyncio import AsyncSession + +import database +from database import Marketplace, MarketplaceProduct, DailyStock + + +@unique +class StockUpdateType(IntEnum): + SALE = 0 + SUPPLIER_UPDATE = 1 + WAREHOUSE_UPDATE = 2 + +@dataclass +class StockUpdate: + product_id: int + type: StockUpdateType + quantity: int + + +class StocksUpdater: + def __init__(self, session: AsyncSession): + self.session = session + + async def update_marketplace(self, marketplace_id: int, updates: List[StockUpdate]): + pass + + async def update(self, updates: list[StockUpdate]): + updates_dict = defaultdict(list) + stock_update_values = [] + for update in updates: + # Working with sold today + if update.type == StockUpdateType.SALE: + stock_update_values.append({ + 'product_id': update.product_id, + 'sold_today': update.quantity + }) + # Working with marketplaces + stmt = ( + select( + MarketplaceProduct.marketplace_id.distinct() + ) + .where( + MarketplaceProduct.product_id == update.product_id + ) + ) + stmt_result = await self.session.execute(stmt) + marketplace_ids = stmt_result.scalars().all() + if not marketplace_ids: + continue + for marketplace_id in marketplace_ids: + updates_dict[marketplace_id].append(update) + updates_list = list(updates_dict.items()) + updates_list = sorted(updates_list, key=lambda x: x[1]) + + # Updating DailyStock-s + insert_stmt = ( + insert( + DailyStock + ) + .values( + stock_update_values + ) + ) + insert_stmt = ( + insert_stmt.on_conflict_do_update( + index_elements=['product_id'], + set_={ + 'sold_today': DailyStock.sold_today + insert_stmt.excluded.sold_today + } + ) + ) + await self.session.execute(insert_stmt) + await self.session.commit() + + for marketplace_id, marketplace_updates in updates_list: + await self.update_marketplace(marketplace_id, marketplace_updates) diff --git a/updaters/wildberries_updater.py b/updaters/wildberries_updater.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/updaters/wildberries_updater.py @@ -0,0 +1 @@ + diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..2dad89d --- /dev/null +++ b/utils.py @@ -0,0 +1,7 @@ +def _chunk_list(array: list, chunk_size: int): + for i in range(0, len(array), chunk_size): + yield array[i:i + chunk_size] + + +def chunk_list(array: list, chunk_size: int) -> list[list]: + return list(_chunk_list(array, chunk_size))