Files
Sipro-Stocks/alembic/env.py
2024-06-29 22:30:23 +03:00

120 lines
3.4 KiB
Python

import asyncio
import backend.config as settings
from logging.config import fileConfig
from sqlalchemy.engine import Connection
from alembic import context
from backend.session import engine
from models import BaseModel
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = BaseModel.metadata
print(target_metadata.schema)
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def include_object(object, name, type_, reflected, compare_to):
if type_ == 'table' and object.schema != target_metadata.schema:
return False
return True
def get_url():
url = config.get_main_option("sqlalchemy.url").format(
PG_LOGIN=settings.PG_LOGIN,
PG_PASSWORD=settings.PG_PASSWORD,
PG_HOST=settings.PG_HOST,
PG_DATABASE=settings.PG_DATABASE,
)
return url
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = get_url()
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
version_table_schema=target_metadata.schema,
include_schemas=True
)
with context.begin_transaction():
"""
By default search_path is setted to "$user",public
that why alembic can't create foreign keys correctly
"""
context.execute('SET search_path TO public')
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
# Here we set our custom schema configuration
context.configure(
connection=connection,
target_metadata=target_metadata,
version_table="asia_alembic_version",
# Table in the db which will save the current alembic version of this schema
version_table_schema=target_metadata.schema, # Alternative: "public", to put version tables in seperate schema
include_schemas=True,
include_object=include_object,
)
with context.begin_transaction():
# context.execute(f'set search_path to {target_metadata.schema}')
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()