Files
Fulfillment-Backend/card_attributes/handlers/card_attributes_query_handler.py

79 lines
2.8 KiB
Python

from typing import Optional
from sqlalchemy import select, and_
from sqlalchemy.ext.asyncio import AsyncResult
from sqlalchemy.orm import selectinload
from card_attributes.handlers.base_handler import BaseHandler
from models import Attribute, project_attribute, card_relations, Card, Project, Board
class CardAttributesQueryHandler(BaseHandler):
async def get_attributes_for_project(self, project_id: int) -> list[Attribute]:
stmt = (
select(Attribute)
.join(project_attribute, project_attribute.c.attribute_id == Attribute.id)
.where(
project_attribute.c.project_id == project_id,
Attribute.is_deleted == False,
)
)
attributes = (await self.session.scalars(stmt)).all()
return list(attributes)
async def get_attr_by_name(self, attr_name: str) -> Optional[Attribute]:
stmt = (
select(Attribute)
.options(
selectinload(Attribute.projects),
)
.where(
Attribute.name == attr_name,
Attribute.is_deleted == False,
)
)
attribute = (await self.session.scalars(stmt)).first()
return attribute
async def get_card_ids_by_group_id(self, group_id: int) -> list[int]:
stmt = (
select(card_relations.c.card_id)
.where(card_relations.c.group_id == group_id)
)
ids = await self.session.scalars(stmt)
return list(ids)
async def get_all_cards_for_project(self, project_id: int) -> AsyncResult[Card]:
stmt = (
select(Card)
.join(Board)
.join(Project)
.where(Project.id == project_id)
.options(selectinload(Card.attributes))
.execution_options(yield_per=100)
)
rows: AsyncResult[tuple[Card]] = await self.session.stream(stmt)
async for row in rows:
yield row[0]
async def get_project_attr(self, project_id: int, attribute_id: int) -> project_attribute:
stmt_is_attribute_already_added = (
select(project_attribute)
.where(
and_(
project_attribute.c.project_id == project_id,
project_attribute.c.attribute_id == attribute_id,
)
)
)
project_attribute_inst = await self.session.execute(stmt_is_attribute_already_added)
return project_attribute_inst.first()
async def get_attributes_by_ids(self, attribute_ids: list[int]) -> list[Attribute]:
stmt = (
select(Attribute)
.where(Attribute.id.in_(attribute_ids))
)
attributes = (await self.session.scalars(stmt)).all()
return list(attributes)