223 lines
7.5 KiB
Python
223 lines
7.5 KiB
Python
from sqlalchemy import select, delete, func, literal
|
|
from sqlalchemy.orm import aliased, selectinload
|
|
|
|
from models_wms import PlaceType, Place
|
|
from schemas.warehouse_management import *
|
|
from services.base import BaseService
|
|
|
|
|
|
class WmsService(BaseService):
|
|
@staticmethod
|
|
def create_places_from_list(nodes: list) -> list:
|
|
node_map = {node.id: node for node in nodes}
|
|
root_nodes = []
|
|
|
|
for node in node_map.values():
|
|
parent_id = node.parent_id
|
|
if parent_id is None:
|
|
root_nodes.append(node)
|
|
else:
|
|
parent = node_map.get(parent_id)
|
|
if parent:
|
|
parent.children.append(node)
|
|
|
|
return root_nodes
|
|
|
|
async def get_place_types(self) -> GetPlaceTypesResponse:
|
|
pt_alias = aliased(PlaceType)
|
|
|
|
base_query = (
|
|
select(
|
|
PlaceType.id,
|
|
PlaceType.name,
|
|
PlaceType.parent_id,
|
|
func.count(Place.id).label("count")
|
|
)
|
|
.join(Place)
|
|
.group_by(PlaceType.id, PlaceType.parent_id, PlaceType.name)
|
|
.where(PlaceType.parent_id == None)
|
|
)
|
|
|
|
recursive_query = select(
|
|
pt_alias.id,
|
|
pt_alias.name,
|
|
pt_alias.parent_id,
|
|
literal(0).label("count")
|
|
).join(
|
|
PlaceType,
|
|
PlaceType.id == pt_alias.parent_id,
|
|
isouter=True,
|
|
)
|
|
|
|
cte = base_query.union_all(recursive_query).cte(recursive=True)
|
|
|
|
stmt = select(cte)
|
|
result = await self.session.execute(stmt)
|
|
|
|
place_types = []
|
|
for type_id, name, parent_id, places_count in result:
|
|
place_type = PlaceTypeSchema(
|
|
id=type_id,
|
|
name=name,
|
|
children=[],
|
|
parent_id=parent_id,
|
|
places_count=places_count,
|
|
)
|
|
place_types.append(place_type)
|
|
|
|
place_types = WmsService.create_places_from_list(place_types)
|
|
|
|
return GetPlaceTypesResponse(place_types=place_types)
|
|
|
|
async def get_flat_place_types(self, parent_place_type_id: int) -> GetFlatPlaceTypesResponse:
|
|
stmt = select(PlaceType)
|
|
if parent_place_type_id != -1:
|
|
stmt = stmt.where(PlaceType.parent_id == parent_place_type_id)
|
|
else:
|
|
stmt = stmt.where(PlaceType.parent_id == None)
|
|
|
|
place_types = (await self.session.scalars(stmt)).all()
|
|
|
|
return GetFlatPlaceTypesResponse(place_types=place_types)
|
|
|
|
async def create_place_type(self, request: CreatePlaceTypeRequest) -> CreatePlaceTypeResponse:
|
|
place_type = PlaceType(
|
|
name=request.place_type.name,
|
|
parent_id=request.place_type.parent_id,
|
|
)
|
|
self.session.add(place_type)
|
|
await self.session.commit()
|
|
return CreatePlaceTypeResponse(ok=True, message='Тип места на складе успешно создан')
|
|
|
|
async def edit_place_type(self, request: EditPlaceTypeRequest) -> EditPlaceTypeResponse:
|
|
place_type = await self.session.get(PlaceType, request.place_type.id)
|
|
place_type.name = request.place_type.name
|
|
await self.session.commit()
|
|
return EditPlaceTypeResponse(ok=True, message='Тип места на складе успешно изменен')
|
|
|
|
async def delete_place_type(self, place_type_id: int) -> DeletePlaceTypeResponse:
|
|
stmt = (
|
|
delete(PlaceType)
|
|
.where(PlaceType.id == place_type_id)
|
|
)
|
|
await self.session.execute(stmt)
|
|
await self.session.commit()
|
|
return DeletePlaceTypeResponse(ok=True, message='Тип места на складе успешно удален')
|
|
|
|
async def get_raw_places(self) -> list[dict]:
|
|
pt_alias = aliased(Place)
|
|
|
|
base_query = select(Place)
|
|
|
|
recursive_query = (
|
|
select(pt_alias)
|
|
.join(
|
|
Place,
|
|
Place.id == pt_alias.parent_id
|
|
)
|
|
)
|
|
|
|
cte = base_query.union_all(recursive_query).cte(recursive=True)
|
|
recur_sub = select(cte).subquery()
|
|
|
|
place_type_child = aliased(PlaceType)
|
|
|
|
place_type_count_sub = (
|
|
select(func.count(place_type_child.id).label("child_count"))
|
|
.where(PlaceType.id == place_type_child.parent_id)
|
|
.scalar_subquery()
|
|
)
|
|
place_type_sub = (
|
|
select(PlaceType, place_type_count_sub.label("child_count"))
|
|
.order_by(PlaceType.id)
|
|
.subquery()
|
|
)
|
|
|
|
res_stmt = (
|
|
select(recur_sub, place_type_sub)
|
|
.join(place_type_sub, place_type_sub.c.id == recur_sub.c.place_type_id)
|
|
)
|
|
|
|
result = await self.session.execute(res_stmt)
|
|
result_dicts = result.mappings().all()
|
|
return [*result_dicts]
|
|
|
|
async def get_places(self) -> GetPlacesResponse:
|
|
result = await self.get_raw_places()
|
|
|
|
places = []
|
|
for place in result:
|
|
place_type = BasePlaceTypeWithCountSchema(
|
|
child_count=place['child_count'],
|
|
parent_id=place['parent_id_1'],
|
|
name=place['name'],
|
|
)
|
|
place = PlaceSchema(
|
|
id=place['id'],
|
|
number=place['number'],
|
|
children=[],
|
|
parent_id=place['parent_id'],
|
|
place_type=place_type,
|
|
place_type_id=place['place_type_id'],
|
|
)
|
|
places.append(place)
|
|
|
|
places = WmsService.create_places_from_list(places)
|
|
|
|
return GetPlacesResponse(places=places)
|
|
|
|
async def create_place(self, request: CreatePlaceRequest) -> CreatePlaceResponse:
|
|
stmt = (
|
|
select(func.max(Place.number))
|
|
.where(Place.parent_id == request.place.parent_id)
|
|
)
|
|
last_number = await self.session.scalar(stmt)
|
|
last_number = last_number if last_number else 0
|
|
|
|
place = Place(
|
|
number=last_number + 1,
|
|
parent_id=request.place.parent_id,
|
|
place_type_id=request.place.place_type_id,
|
|
)
|
|
self.session.add(place)
|
|
await self.session.commit()
|
|
return CreatePlaceResponse(ok=True, message='Место на складе успешно создано')
|
|
|
|
async def edit_place(self, request: EditPlaceRequest) -> EditPlaceResponse:
|
|
place = await self.session.get(Place, request.place.id)
|
|
place.name = request.place.name
|
|
await self.session.commit()
|
|
return EditPlaceResponse(ok=True, message='Место на складе успешно изменено')
|
|
|
|
async def delete_place(self, place_id: int) -> DeletePlaceResponse:
|
|
stmt = (
|
|
delete(Place)
|
|
.where(Place.id == place_id)
|
|
)
|
|
await self.session.execute(stmt)
|
|
await self.session.commit()
|
|
return DeletePlaceResponse(ok=True, message='Место на складе успешно удалено')
|
|
|
|
async def get_code_of_place(self, place_id: int) -> str:
|
|
select_depth = 10
|
|
option = selectinload(Place.parent)
|
|
for _ in range(select_depth):
|
|
option = option.selectinload(Place.parent)
|
|
stmt = (
|
|
select(Place)
|
|
.where(Place.id == place_id)
|
|
.options(option)
|
|
)
|
|
places = (await self.session.scalars(stmt)).all()
|
|
|
|
numbers: list[str] = []
|
|
if len(places) == 0:
|
|
return ''
|
|
|
|
place = places[0]
|
|
while place:
|
|
numbers.append(str(place.number))
|
|
place = place.parent
|
|
|
|
return '.'.join(numbers[::-1])
|