Files
Fulfillment-Backend/services/card_tag.py

123 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from sqlalchemy import select
from sqlalchemy.orm import selectinload
from models import CardTag, Card, CardGroup
from schemas.card_tag import *
from services.base import BaseService
class CardTagService(BaseService):
async def _get_by_name_and_project_id(self, name: str, project_id: int) -> Optional[CardTag]:
stmt = (
select(CardTag)
.where(
CardTag.name == name,
CardTag.project_id == project_id,
CardTag.is_deleted == False,
)
)
card_tag = await self.session.scalars(stmt)
return card_tag.first()
async def create(self, request: CreateTagRequest) -> CreateTagResponse:
existing_tag = await self._get_by_name_and_project_id(request.tag.name, request.tag.project_id)
if existing_tag:
return UpdateTagResponse(ok=False, message='Тег с таким названием уже существует')
tag = CardTag(name=request.tag.name, project_id=request.tag.project_id)
self.session.add(tag)
await self.session.commit()
return CreateTagResponse(ok=True, message='Тег успешно создан')
async def update(self, request: UpdateTagRequest) -> UpdateTagResponse:
card_tag = await self.session.get(CardTag, request.tag.id)
if not card_tag:
return UpdateTagResponse(ok=False, message=f'Тег с ID {request.tag.id} не найден')
if card_tag.name != request.tag.name:
existing_tag = await self._get_by_name_and_project_id(request.tag.name, request.tag.project_id)
if existing_tag:
return UpdateTagResponse(ok=False, message='Тег с таким названием уже существует')
card_tag.name = request.tag.name
await self.session.commit()
return UpdateTagResponse(ok=True, message='Тег успешно обновлен')
async def delete(self, card_tag_id: int) -> DeleteTagResponse:
card_tag = await self.session.get(CardTag, card_tag_id)
if not card_tag:
return DeleteTagResponse(ok=False, message=f'Тег с ID {card_tag_id} не найден')
card_tag.is_deleted = True
await self.session.commit()
return DeleteTagResponse(ok=True, message='Тег успешно удален')
async def _switch_tag_in_card(self, card_tag: CardTag, card_id: int) -> tuple[bool, str]:
stmt = (
select(Card)
.options(selectinload(Card.tags))
.where(Card.id == card_id)
)
card: Optional[Card] = (await self.session.scalars(stmt)).first()
if not card:
return False, f'Карточка с ID {card_id} не найдена'
if card_tag in card.tags:
card.tags.remove(card_tag)
return True, 'Тег откреплен от карточки'
card.tags.append(card_tag)
return True, 'Тег прикреплен к карточке'
async def _get_cards_by_group_id(self, group_id) -> list[Card]:
group: CardGroup | None = await self.session.scalar(
select(CardGroup)
.where(CardGroup.id == group_id)
)
return group.cards if group else []
async def _switch_tag_in_group(self, card_tag: CardTag, group_id: int):
cards = await self._get_cards_by_group_id(group_id)
for card in cards:
if card_tag in card.tags:
card.tags.remove(card_tag)
else:
card.tags.append(card_tag)
return True, 'Теги группы изменены'
async def switch_tag(self, request: SwitchTagRequest) -> SwitchTagResponse:
card_tag: Optional[CardTag] = await self.session.get(CardTag, request.tag_id)
if not card_tag:
return SwitchTagResponse(ok=False, message=f'Тег с ID {request.tag_id} не найден')
if request.card_id:
ok, message = await self._switch_tag_in_card(card_tag, request.card_id)
else:
ok, message = await self._switch_tag_in_group(card_tag, request.group_id)
await self.session.commit()
return SwitchTagResponse(ok=True, message=message)
async def apply_tags(self, card: Card, tag_names: list[str]):
stmt = (
select(CardTag)
.where(
CardTag.name.in_(tag_names),
CardTag.is_deleted == False,
CardTag.project_id == card.board.project_id
)
)
tags = (await self.session.scalars(stmt)).all()
cards = [card]
if card.group:
cards = await self._get_cards_by_group_id(card.group.id)
for card in cards:
card.tags = tags
await self.session.flush()