Skip to content

Runner

pelican.runner.MigrationRunner

MigrationRunner()
Source code in pelican/runner.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(self) -> None:
    load_dotenv(".env")

    self.database_url: str = environ.get("DATABASE_URL", "sqlite:///database.db")
    self.engine: Engine = create_engine(self.database_url)
    self.metadata: MetaData = SQLModel.metadata

    dialect_name = self.engine.dialect.name
    compiler_cls = _DIALECT_COMPILERS.get(dialect_name)

    if not compiler_cls:
        raise ValueError(
            f"Unsupported dialect: {dialect_name}. "
            f"Supported dialects: {', '.join(_DIALECT_COMPILERS.keys())}"
        )
    self.compiler = compiler_cls(self.engine)

    self._ensure_version_table_exists()

database_url instance-attribute

database_url = get('DATABASE_URL', 'sqlite:///database.db')

engine instance-attribute

engine = create_engine(database_url)

metadata instance-attribute

metadata = metadata

compiler instance-attribute

compiler = compiler_cls(engine)

get_applied_versions

get_applied_versions()
Source code in pelican/runner.py
52
53
54
55
def get_applied_versions(self) -> Iterator[int]:
    with Session(self.engine) as s:
        for version in s.exec(select(_SchemaMigration.version)):
            yield int(version)

upgrade

upgrade(migration)
Source code in pelican/runner.py
57
58
59
60
61
62
def upgrade(self, migration: Migration) -> None:
    if not migration.up:
        raise ValueError("Migration has no upgrade function")

    migration.up()
    self._record_applied(migration.revision)

downgrade

downgrade(migration)
Source code in pelican/runner.py
64
65
66
67
68
69
def downgrade(self, migration: Migration) -> None:
    if not migration.down:
        raise ValueError("Migration has no downgrade function")

    migration.down()
    self._record_unapplied(migration.revision)

execute

execute(ddls)
Source code in pelican/runner.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def execute(self, ddls: Iterable[Union[str, Executable, TextClause]]) -> None:
    compiled_statements: list[tuple[str, dict]] = []

    for ddl in ddls:
        if isinstance(ddl, str):
            compiled_statements.append((ddl, {}))
        elif isinstance(ddl, (DDLElement, TextClause)):
            compiled = ddl.compile(dialect=self.engine.dialect)
            sql = compiled.string
            params = compiled.params or {}
            compiled_statements.append((sql, params))
        else:
            raise TypeError(f"Unsupported DDL type: {type(ddl)}")

    with self.engine.begin() as conn:
        for sql, params in compiled_statements:
            conn.exec_driver_sql(sql, params)

execute_operations

execute_operations(operations)
Source code in pelican/runner.py
89
90
91
92
93
94
95
96
def execute_operations(self, operations: Iterable["Operation"]) -> None:
    compiled_ddls = []

    for operation in operations:
        ddls = operation.compile(self.compiler)
        compiled_ddls.extend(list(ddls))

    self.execute(compiled_ddls)