feat: async pytest, testcases for starting and finishing shifts
This commit is contained in:
103
tests/fixture_loader.py
Normal file
103
tests/fixture_loader.py
Normal file
@@ -0,0 +1,103 @@
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import text, Table, insert
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from models import User, Role, WorkShift, PayrollScheme, PayRate, user_pay_rate
|
||||
from models.work_shifts import WorkShiftPause
|
||||
|
||||
|
||||
class FixtureLoader:
|
||||
def __init__(self, fixture_path: str = "fixtures"):
|
||||
project_root = Path(__file__).parent
|
||||
self.fixture_path = project_root / fixture_path
|
||||
|
||||
@staticmethod
|
||||
def _fixtures_to_load() -> list[tuple[str, Any]]:
|
||||
return [
|
||||
("roles", Role),
|
||||
("payroll_schemas", PayrollScheme),
|
||||
("pay_rates", PayRate),
|
||||
("users", User),
|
||||
("work_shifts", WorkShift),
|
||||
("work_shift_pauses", WorkShiftPause),
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def _many_to_many_fixtures() -> list[tuple[str, Table]]:
|
||||
return [
|
||||
("user_pay_rates", user_pay_rate),
|
||||
]
|
||||
|
||||
async def load_fixtures(self, db: AsyncSession):
|
||||
file_postfix = ".json"
|
||||
for fixture_file, model in self._fixtures_to_load():
|
||||
await self._load_model_fixtures(db, fixture_file + file_postfix, model)
|
||||
|
||||
for fixture_file, table in self._many_to_many_fixtures():
|
||||
await self._load_m2m_fixtures(db, fixture_file + file_postfix, table)
|
||||
|
||||
async def _load_model_fixtures(
|
||||
self,
|
||||
db: AsyncSession,
|
||||
fixture_file: str,
|
||||
model: Any,
|
||||
):
|
||||
"""Load fixtures for a specific model"""
|
||||
fixture_path = os.path.join(self.fixture_path, fixture_file)
|
||||
|
||||
if not os.path.exists(fixture_path):
|
||||
print(f"Fixture file {fixture_path} not found")
|
||||
return 0
|
||||
|
||||
with open(fixture_path, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
for item_data in data:
|
||||
converted_data = {}
|
||||
for key, value in item_data.items():
|
||||
if isinstance(value, str) and len(value) == 19:
|
||||
try:
|
||||
# Try to parse as datetime
|
||||
converted_data[key] = datetime.strptime(value, "%Y-%m-%d %H:%M:%S")
|
||||
except ValueError:
|
||||
# If it fails, keep the original value
|
||||
converted_data[key] = value
|
||||
else:
|
||||
converted_data[key] = value
|
||||
|
||||
db_item = model(**converted_data)
|
||||
db.add(db_item)
|
||||
|
||||
await db.commit()
|
||||
|
||||
async def _load_m2m_fixtures(
|
||||
self,
|
||||
db: AsyncSession,
|
||||
fixture_file: str,
|
||||
table: Table,
|
||||
):
|
||||
"""Load fixtures for many-to-many association tables"""
|
||||
fixture_path = os.path.join(self.fixture_path, fixture_file)
|
||||
|
||||
if not os.path.exists(fixture_path):
|
||||
print(f"Fixture file {fixture_path} not found")
|
||||
return 0
|
||||
|
||||
with open(fixture_path, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Use SQLAlchemy insert for association tables
|
||||
if data:
|
||||
await db.execute(insert(table), data)
|
||||
await db.commit()
|
||||
|
||||
async def clear_fixtures(self, db: AsyncSession):
|
||||
"""Clear all fixture data (useful for testing)"""
|
||||
for fixture_file, _ in self._fixtures_to_load()[::-1]:
|
||||
await db.execute(text("DELETE FROM " + fixture_file))
|
||||
await db.commit()
|
||||
Reference in New Issue
Block a user