Files
Fulfillment-Backend/services/card_tag.py
2025-03-13 19:29:15 +04:00

144 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sqlalchemy import select
from sqlalchemy.orm import selectinload
from models import CardTag, Card, CardGroup, CardTagColor
from schemas.card_tag import *
from services.base import BaseService
class CardTagService(BaseService):
async def _get_by_name_and_project_id(self, name: str, project_id: int) -> Optional[CardTag]:
stmt = (
select(CardTag)
.where(
CardTag.name == name,
CardTag.project_id == project_id,
CardTag.is_deleted == False,
)
)
card_tag = await self.session.scalars(stmt)
return card_tag.first()
async def create(self, request: CreateTagRequest) -> CreateTagResponse:
existing_tag = await self._get_by_name_and_project_id(request.tag.name, request.tag.project_id)
if existing_tag:
return UpdateTagResponse(ok=False, message='Тег с таким названием уже существует')
tag = CardTag(
name=request.tag.name,
project_id=request.tag.project_id,
tag_color_id=request.tag.tag_color_id,
)
self.session.add(tag)
await self.session.commit()
return CreateTagResponse(ok=True, message='Тег успешно создан')
async def update(self, request: UpdateTagRequest) -> UpdateTagResponse:
card_tag = await self.session.get(CardTag, request.tag.id)
if not card_tag:
return UpdateTagResponse(ok=False, message=f'Тег с ID {request.tag.id} не найден')
if card_tag.name != request.tag.name:
existing_tag = await self._get_by_name_and_project_id(request.tag.name, request.tag.project_id)
if existing_tag:
return UpdateTagResponse(ok=False, message='Тег с таким названием уже существует')
card_tag.name = request.tag.name
card_tag.tag_color_id = request.tag.tag_color_id
await self.session.commit()
return UpdateTagResponse(ok=True, message='Тег успешно обновлен')
async def delete(self, card_tag_id: int) -> DeleteTagResponse:
card_tag = await self.session.get(CardTag, card_tag_id)
if not card_tag:
return DeleteTagResponse(ok=False, message=f'Тег с ID {card_tag_id} не найден')
card_tag.is_deleted = True
await self.session.commit()
return DeleteTagResponse(ok=True, message='Тег успешно удален')
async def _switch_tag_in_card(self, card_tag: CardTag, card_id: int) -> tuple[bool, str]:
stmt = (
select(Card)
.options(selectinload(Card.tags))
.where(Card.id == card_id)
)
card: Optional[Card] = (await self.session.scalars(stmt)).first()
if not card:
return False, f'Карточка с ID {card_id} не найдена'
if card_tag in card.tags:
card.tags.remove(card_tag)
return True, 'Тег откреплен от карточки'
card.tags.append(card_tag)
return True, 'Тег прикреплен к карточке'
async def _get_cards_by_group_id(self, group_id) -> list[Card]:
group: CardGroup | None = await self.session.scalar(
select(CardGroup)
.where(CardGroup.id == group_id)
.options(
selectinload(CardGroup.cards).noload(Card.products),
selectinload(CardGroup.cards).noload(Card.services),
selectinload(CardGroup.cards).noload(Card.shipping_warehouse),
selectinload(CardGroup.cards).noload(Card.client),
selectinload(CardGroup.cards).noload(Card.group),
)
)
return group.cards if group else []
async def _switch_tag_in_group(self, card_tag: CardTag, group_id: int):
cards = await self._get_cards_by_group_id(group_id)
for card in cards:
if card_tag in card.tags:
card.tags.remove(card_tag)
else:
card.tags.append(card_tag)
return True, 'Теги группы изменены'
async def switch_tag(self, request: SwitchTagRequest) -> SwitchTagResponse:
card_tag: Optional[CardTag] = await self.session.get(CardTag, request.tag_id)
if not card_tag:
return SwitchTagResponse(ok=False, message=f'Тег с ID {request.tag_id} не найден')
if request.card_id:
ok, message = await self._switch_tag_in_card(card_tag, request.card_id)
else:
ok, message = await self._switch_tag_in_group(card_tag, request.group_id)
await self.session.commit()
return SwitchTagResponse(ok=True, message=message)
async def apply_tags(self, card: Card, tag_names: list[str]):
stmt = (
select(CardTag)
.where(
CardTag.name.in_(tag_names),
CardTag.is_deleted == False,
CardTag.project_id == card.board.project_id
)
)
tags = (await self.session.scalars(stmt)).all()
cards = [card]
if card.group:
cards = await self._get_cards_by_group_id(card.group.id)
for card in cards:
card.tags = tags
await self.session.flush()
async def get_tag_colors(self) -> GetTagColorsResponse:
stmt = (
select(CardTagColor)
.where(CardTagColor.is_deleted==False)
)
colors = (await self.session.scalars(stmt)).all()
return GetTagColorsResponse(colors=colors)