Skip to content

Runner

pelican._context.use_context

use_context(runner=None, *, database_url=None)

Activate a runner and registry for the duration of a with block.

Example

from pelican import use_context
from pelican import loader

with use_context(database_url="postgresql://user:password@localhost/mydb") as runner:
    registry = loader.load_migrations("db/migrations")
    for migration in registry:
        runner.upgrade(migration)
Source code in pelican/_context.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@contextmanager
def use_context(
    runner: MigrationRunner | None = None,
    *,
    database_url: str | None = None,
) -> Iterator[MigrationRunner]:
    """Activate a runner and registry for the duration of a `with` block.

    ## Example

    ```python
    from pelican import use_context
    from pelican import loader

    with use_context(database_url="postgresql://user:password@localhost/mydb") as runner:
        registry = loader.load_migrations("db/migrations")
        for migration in registry:
            runner.upgrade(migration)
    ```
    """
    active = runner or MigrationRunner(database_url=database_url)
    registry = MigrationRegistry()

    r_token = _active_runner.set(active)
    reg_token = _active_registry.set(registry)

    try:
        yield active
    finally:
        _active_runner.reset(r_token)
        _active_registry.reset(reg_token)

pelican.runner.MigrationRunner

MigrationRunner(database_url=None)
Source code in pelican/runner.py
46
47
48
49
50
51
52
53
def __init__(self, database_url: str | None = None) -> None:
    self._database_url: str | None = None
    self._engine: Engine | None = None
    self._compiler: DialectCompiler | None = None

    self.metadata: MetaData = SQLModel.metadata
    if url := database_url or environ.get("DATABASE_URL"):
        self.database_url = url

metadata instance-attribute

metadata = metadata

database_url property writable

database_url

has_database_url property

has_database_url

Whether a database URL has been configured on this runner.

engine property

engine

compiler property

compiler

get_applied_versions

get_applied_versions()
Source code in pelican/runner.py
82
83
84
85
86
87
def get_applied_versions(self) -> Iterator[int]:
    self._ensure_version_table_exists()

    with Session(self.engine) as s:
        for version in s.exec(select(_SchemaMigration.version)):
            yield int(version)

upgrade

upgrade(migration)
Source code in pelican/runner.py
89
90
91
92
93
94
def upgrade(self, migration: Migration) -> None:
    if not migration.up:
        raise ValueError("Migration has no upgrade function")

    migration.up()
    self._record_applied(migration.revision)

downgrade

downgrade(migration)
Source code in pelican/runner.py
 96
 97
 98
 99
100
101
def downgrade(self, migration: Migration) -> None:
    if not migration.down:
        raise ValueError("Migration has no downgrade function")

    migration.down()
    self._record_unapplied(migration.revision)

execute

execute(ddls)
Source code in pelican/runner.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def execute(self, ddls: Iterable[str | Executable | TextClause]) -> None:
    compiled_statements: list[tuple[str, dict]] = []

    for ddl in ddls:
        if isinstance(ddl, str):
            compiled_statements.append((ddl, {}))
        elif isinstance(ddl, (DDLElement, TextClause)):
            compiled = ddl.compile(dialect=self.engine.dialect)
            sql = compiled.string
            params = compiled.params or {}
            compiled_statements.append((sql, params))
        else:
            raise TypeError(f"Unsupported DDL type: {type(ddl)}")

    with self.engine.connect() as conn:
        with conn.begin():
            for sql, params in compiled_statements:
                conn.exec_driver_sql(sql, params)

execute_operations

execute_operations(operations)
Source code in pelican/runner.py
122
123
124
125
126
127
128
129
def execute_operations(self, operations: Iterable["Operation"]) -> None:
    compiled_ddls = []

    for operation in operations:
        ddls = operation.compile(self.compiler)
        compiled_ddls.extend(list(ddls))

    self.execute(compiled_ddls)