from sqlalchemy import select, delete, func, literal from sqlalchemy.orm import aliased, selectinload from models_wms import PlaceType, Place from schemas.warehouse_management import * from services.base import BaseService class WmsService(BaseService): @staticmethod def create_places_from_list(nodes: list) -> list: node_map = {node.id: node for node in nodes} root_nodes = [] for node in node_map.values(): parent_id = node.parent_id if parent_id is None: root_nodes.append(node) else: parent = node_map.get(parent_id) if parent: parent.children.append(node) return root_nodes async def get_place_types(self) -> GetPlaceTypesResponse: pt_alias = aliased(PlaceType) base_query = ( select( PlaceType.id, PlaceType.name, PlaceType.parent_id, func.count(Place.id).label("count") ) .join(Place) .group_by(PlaceType.id, PlaceType.parent_id, PlaceType.name) .where(PlaceType.parent_id == None) ) recursive_query = select( pt_alias.id, pt_alias.name, pt_alias.parent_id, literal(0).label("count") ).join( PlaceType, PlaceType.id == pt_alias.parent_id, isouter=True, ) cte = base_query.union_all(recursive_query).cte(recursive=True) stmt = select(cte) result = await self.session.execute(stmt) place_types = [] for type_id, name, parent_id, places_count in result: place_type = PlaceTypeSchema( id=type_id, name=name, children=[], parent_id=parent_id, places_count=places_count, ) place_types.append(place_type) place_types = WmsService.create_places_from_list(place_types) return GetPlaceTypesResponse(place_types=place_types) async def get_flat_place_types(self, parent_place_type_id: int) -> GetFlatPlaceTypesResponse: stmt = select(PlaceType) if parent_place_type_id != -1: stmt = stmt.where(PlaceType.parent_id == parent_place_type_id) else: stmt = stmt.where(PlaceType.parent_id == None) place_types = (await self.session.scalars(stmt)).all() return GetFlatPlaceTypesResponse(place_types=place_types) async def create_place_type(self, request: CreatePlaceTypeRequest) -> CreatePlaceTypeResponse: place_type = PlaceType( name=request.place_type.name, parent_id=request.place_type.parent_id, ) self.session.add(place_type) await self.session.commit() return CreatePlaceTypeResponse(ok=True, message='Тип места на складе успешно создан') async def edit_place_type(self, request: EditPlaceTypeRequest) -> EditPlaceTypeResponse: place_type = await self.session.get(PlaceType, request.place_type.id) place_type.name = request.place_type.name await self.session.commit() return EditPlaceTypeResponse(ok=True, message='Тип места на складе успешно изменен') async def delete_place_type(self, place_type_id: int) -> DeletePlaceTypeResponse: stmt = ( delete(PlaceType) .where(PlaceType.id == place_type_id) ) await self.session.execute(stmt) await self.session.commit() return DeletePlaceTypeResponse(ok=True, message='Тип места на складе успешно удален') async def get_raw_places(self) -> list[dict]: pt_alias = aliased(Place) base_query = select(Place) recursive_query = ( select(pt_alias) .join( Place, Place.id == pt_alias.parent_id ) ) cte = base_query.union_all(recursive_query).cte(recursive=True) recur_sub = select(cte).subquery() place_type_child = aliased(PlaceType) place_type_count_sub = ( select(func.count(place_type_child.id).label("child_count")) .where(PlaceType.id == place_type_child.parent_id) .scalar_subquery() ) place_type_sub = ( select(PlaceType, place_type_count_sub.label("child_count")) .order_by(PlaceType.id) .subquery() ) res_stmt = ( select(recur_sub, place_type_sub) .join(place_type_sub, place_type_sub.c.id == recur_sub.c.place_type_id) ) result = await self.session.execute(res_stmt) result_dicts = result.mappings().all() return [*result_dicts] async def get_places(self) -> GetPlacesResponse: result = await self.get_raw_places() places = [] for place in result: place_type = BasePlaceTypeWithCountSchema( child_count=place['child_count'], parent_id=place['parent_id_1'], name=place['name'], ) place = PlaceSchema( id=place['id'], number=place['number'], children=[], parent_id=place['parent_id'], place_type=place_type, place_type_id=place['place_type_id'], ) places.append(place) places = WmsService.create_places_from_list(places) return GetPlacesResponse(places=places) async def create_place(self, request: CreatePlaceRequest) -> CreatePlaceResponse: stmt = ( select(func.max(Place.number)) .where(Place.parent_id == request.place.parent_id) ) last_number = await self.session.scalar(stmt) last_number = last_number if last_number else 0 place = Place( number=last_number + 1, parent_id=request.place.parent_id, place_type_id=request.place.place_type_id, ) self.session.add(place) await self.session.commit() return CreatePlaceResponse(ok=True, message='Место на складе успешно создано') async def edit_place(self, request: EditPlaceRequest) -> EditPlaceResponse: place = await self.session.get(Place, request.place.id) place.number = request.place.number await self.session.commit() return EditPlaceResponse(ok=True, message='Место на складе успешно изменено') async def delete_place(self, place_id: int) -> DeletePlaceResponse: stmt = ( delete(Place) .where(Place.id == place_id) ) await self.session.execute(stmt) await self.session.commit() return DeletePlaceResponse(ok=True, message='Место на складе успешно удалено') async def get_code_of_place(self, place_id: int) -> str: select_depth = 10 option = selectinload(Place.parent) for _ in range(select_depth): option = option.selectinload(Place.parent) stmt = ( select(Place) .where(Place.id == place_id) .options(option) ) places = (await self.session.scalars(stmt)).all() numbers: list[str] = [] if len(places) == 0: return '' place = places[0] while place: numbers.append(str(place.number)) place = place.parent return '.'.join(numbers[::-1])