Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ecb20a4
Add psycopg package.
clokep Nov 9, 2023
c7e3120
Method to set statement timeout.
clokep Nov 15, 2023
91ef287
Separate PostgresEngine into Psycopg2Engine and PsycopgEngine.
clokep Nov 9, 2023
0a06be1
Run tests in CI against psycopg.
clokep Nov 9, 2023
260a5b7
Update user directory to handle psycopg3.
clokep Oct 9, 2024
ce8ad96
Support execute_values on psycopg3.
clokep Oct 9, 2024
93b1740
Fix-up simple_* tests.
clokep Oct 10, 2024
d49827d
Fix-up calls to end_to_end_keys.
clokep Oct 10, 2024
287f0a6
Use superclass version of `executescript()` (#1)
realtyem Oct 17, 2024
9f77ac4
Switch out formatting placeholder for what psycopg2 is expecting (#2)
realtyem Oct 17, 2024
3bbd562
Merge remote-tracking branch 'upstream/develop' into psycopg3
clokep Oct 23, 2024
f5b6429
Linting (and a fix) (#3)
realtyem Oct 23, 2024
7ff4584
Merge remote-tracking branch 'refs/remotes/origin/psycopg3' into psyc…
clokep Oct 23, 2024
5353f8d
Try running complement builds?
clokep Oct 24, 2024
1bec3d7
Merge branch 'develop' into psycopg3
realtyem Jul 21, 2025
f6c2364
Adjust type: ignore line to where mypy will apply it
realtyem Jul 21, 2025
279791d
Add both PsycopgEngine and Psycopg2Engine to database.engines.__all__…
realtyem Jul 21, 2025
1ceb332
Adjust _mark_state_groups_as_pending_deletion_txn() to use execute_ba…
realtyem Jul 21, 2025
95eb7f8
Adjust set_profile_field() insertion sql for a narrower type on param…
realtyem Jul 22, 2025
4ad733f
Try an update to the config schema?
realtyem Jul 22, 2025
f65a885
Adjust unit tests to reflect updated minimal versions of python/postgres
realtyem Jul 22, 2025
425971f
Minor changes to profile field
clokep Sep 30, 2025
61a3aaa
Merge remote-tracking branch 'upstream/develop' into psycopg3
clokep Sep 30, 2025
f31d8d2
Newsfragment
clokep Sep 30, 2025
ad229a9
Merge remote-tracking branch 'upstream/develop' into psycopg3
clokep Sep 30, 2025
2f4352b
poetry lock again
clokep Sep 30, 2025
467a8c4
Merge branch 'develop' into psycopg3
clokep Oct 6, 2025
e59ea08
Merge branch 'develop' into psycopg3-adjust-settings
realtyem Nov 23, 2025
d9226e0
Update poetry.lock file
realtyem Nov 23, 2025
0693794
linting fixups
realtyem Nov 23, 2025
dcc4b5a
Merge pull request #4 from realtyem/psycopg3-adjust-settings
clokep Nov 25, 2025
c195ee6
Merge branch 'develop' into jason/psycopg-merge-develop
jason-famedly Jan 30, 2026
99ff2ac
Adjust for execute_values() usage with fetch=False to only be appropr…
jason-famedly Jan 30, 2026
698a56d
Attempt forcing a lower case comparison to "psycopg" in complement.sh
jason-famedly Jan 31, 2026
088ead3
Add mini docstrings to the abstract postgres engine class, as well as…
jason-famedly Jan 31, 2026
123aeac
I forgot that the new driver is not wired up in the complement script…
jason-famedly Feb 2, 2026
680ffee
Merge pull request #6 from jason-famedly/jason/psycopg-merge-develop
clokep Feb 3, 2026
4cd7924
Use the 'c' extra instead of the 'pure python' version for C-compiled…
jason-famedly Feb 3, 2026
a320d8a
Update 'c' extra to 3.2.8 as a minimum. This avoids olddebian's packa…
jason-famedly Feb 19, 2026
d94d4c0
Allow complement to be ran with WORKERS=1 and still allow the psycopg…
jason-famedly Feb 19, 2026
73415a2
Merge pull request #7 from jason-famedly/jason/adjust-for-feedback
clokep Feb 19, 2026
0e083b1
Merge branch 'develop' into jason/update-psycopg3
jason-famedly Mar 30, 2026
63c7f03
update poetry lock
jason-famedly Mar 30, 2026
0af6799
Merge branch 'develop' into jason/update-psycopg3
jason-famedly Apr 6, 2026
5c4f8a9
Merge pull request #8 from jason-famedly/jason/update-psycopg3
clokep Apr 6, 2026
efcb28c
Merge branch 'develop' into jason/update-poetry-lock
jason-famedly Apr 28, 2026
59cd733
Merge pull request #9 from jason-famedly/jason/update-poetry-lock
clokep Apr 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1448,7 +1448,7 @@ def main() -> None:
if "name" not in postgres_config:
sys.stderr.write("Malformed database config: no 'name'\n")
sys.exit(2)
if postgres_config["name"] != "psycopg2":
if postgres_config["name"] not in ("psycopg", "psycopg2"):
sys.stderr.write("Database must use the 'psycopg2' connector.\n")
sys.exit(3)

Expand Down
2 changes: 1 addition & 1 deletion synapse/config/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class DatabaseConnectionConfig:
def __init__(self, name: str, db_config: dict):
db_engine = db_config.get("name", "sqlite3")

if db_engine not in ("sqlite3", "psycopg2"):
if db_engine not in ("sqlite3", "psycopg2", "psycopg"):
raise ConfigError("Unsupported database type %r" % (db_engine,))

if db_engine == "sqlite3":
Expand Down
21 changes: 18 additions & 3 deletions synapse/storage/engines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,35 @@
from typing import Any, Mapping, NoReturn

from ._base import BaseDatabaseEngine, IncorrectDatabaseSetup
from .postgres import PostgresEngine

# The classes `PostgresEngine` and `Sqlite3Engine` must always be importable, because
# we use `isinstance(engine, PostgresEngine)` to write different queries for postgres
# and sqlite. But the database driver modules are both optional: they may not be
# installed. To account for this, create dummy classes on import failure so we can
# still run `isinstance()` checks.
try:
from .postgres import PostgresEngine
from .psycopg2 import Psycopg2Engine
except ImportError:

class PostgresEngine(BaseDatabaseEngine): # type: ignore[no-redef]
class Psycopg2Engine(BaseDatabaseEngine): # type: ignore[no-redef]
def __new__(cls, *args: object, **kwargs: object) -> NoReturn:
raise RuntimeError(
f"Cannot create {cls.__name__} -- psycopg2 module is not installed"
)


try:
from .psycopg import PsycopgEngine
except ImportError:

class PsycopgEngine(BaseDatabaseEngine): # type: ignore[no-redef]
def __new__(cls, *args: object, **kwargs: object) -> NoReturn:
raise RuntimeError(
f"Cannot create {cls.__name__} -- psycopg module is not installed"
)


try:
from .sqlite import Sqlite3Engine
except ImportError:
Expand All @@ -56,7 +68,10 @@ def create_engine(database_config: Mapping[str, Any]) -> BaseDatabaseEngine:
return Sqlite3Engine(database_config)

if name == "psycopg2":
return PostgresEngine(database_config)
return Psycopg2Engine(database_config)

if name == "psycopg":
return PsycopgEngine(database_config)

raise RuntimeError("Unsupported database engine '%s'" % (name,))

Expand Down
7 changes: 5 additions & 2 deletions synapse/storage/engines/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,12 @@ class IncorrectDatabaseSetup(RuntimeError):

ConnectionType = TypeVar("ConnectionType", bound=Connection)
CursorType = TypeVar("CursorType", bound=Cursor)
IsolationLevelType = TypeVar("IsolationLevelType")


class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCMeta):
class BaseDatabaseEngine(
Generic[ConnectionType, CursorType, IsolationLevelType], metaclass=abc.ABCMeta
):
def __init__(self, module: DBAPI2Module, config: Mapping[str, Any]):
self.module = module

Expand Down Expand Up @@ -129,7 +132,7 @@ def attempt_to_set_autocommit(self, conn: ConnectionType, autocommit: bool) -> N

@abc.abstractmethod
def attempt_to_set_isolation_level(
self, conn: ConnectionType, isolation_level: Optional[int]
self, conn: ConnectionType, isolation_level: Optional[IsolationLevelType]
) -> None:
"""Attempt to set the connections isolation level.

Expand Down
90 changes: 33 additions & 57 deletions synapse/storage/engines/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@
#
#

import abc
import logging
from typing import TYPE_CHECKING, Any, Mapping, NoReturn, Optional, Tuple, cast

import psycopg2.extensions
from typing import TYPE_CHECKING, Any, Generic, Mapping, Optional, Tuple, cast

from synapse.storage.engines._base import (
AUTO_INCREMENT_PRIMARY_KEYPLACEHOLDER,
BaseDatabaseEngine,
ConnectionType,
CursorType,
IncorrectDatabaseSetup,
IsolationLevel,
IsolationLevelType,
)
from synapse.storage.types import Cursor
from synapse.storage.types import Cursor, DBAPI2Module

if TYPE_CHECKING:
from synapse.storage.database import LoggingDatabaseConnection
Expand All @@ -40,19 +41,16 @@


class PostgresEngine(
BaseDatabaseEngine[psycopg2.extensions.connection, psycopg2.extensions.cursor]
Generic[ConnectionType, CursorType, IsolationLevelType],
BaseDatabaseEngine[ConnectionType, CursorType, IsolationLevelType],
metaclass=abc.ABCMeta,
):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a mini docstring pointing out this is (now) an abstract class, with Pyscopg2Engine and PsycopgEngine as concrete implementations, might be nice

def __init__(self, database_config: Mapping[str, Any]):
super().__init__(psycopg2, database_config)
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
isolation_level_map: Mapping[int, IsolationLevelType]
default_isolation_level: IsolationLevelType

# Disables passing `bytes` to txn.execute, c.f.
# https://github.com/matrix-org/synapse/issues/6186. If you do
# actually want to use bytes than wrap it in `bytearray`.
def _disable_bytes_adapter(_: bytes) -> NoReturn:
raise Exception("Passing bytes to DB is disabled.")
def __init__(self, module: DBAPI2Module, database_config: Mapping[str, Any]):
super().__init__(module, database_config)

psycopg2.extensions.register_adapter(bytes, _disable_bytes_adapter)
self.synchronous_commit: bool = database_config.get("synchronous_commit", True)
# Set the statement timeout to 1 hour by default.
# Any query taking more than 1 hour should probably be considered a bug;
Expand All @@ -65,19 +63,19 @@ def _disable_bytes_adapter(_: bytes) -> NoReturn:
)
self._version: Optional[int] = None # unknown as yet

self.isolation_level_map: Mapping[int, int] = {
IsolationLevel.READ_COMMITTED: psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED,
IsolationLevel.REPEATABLE_READ: psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ,
IsolationLevel.SERIALIZABLE: psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE,
}
self.default_isolation_level = (
psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ
)
self.config = database_config

def set_statement_timeout(self, cursor: Cursor, statement_timeout: int) -> None:
@abc.abstractmethod
def get_server_version(self, db_conn: ConnectionType) -> int:
"""Gets called when setting up a brand new database. This allows us to
apply stricter checks on new databases versus existing database.
"""
...

@abc.abstractmethod
def set_statement_timeout(self, cursor: CursorType, statement_timeout: int) -> None:
"""Configure the current cursor's statement timeout."""
cursor.execute("SET statement_timeout TO ?", (statement_timeout,))
...

@property
def single_threaded(self) -> bool:
Expand All @@ -92,21 +90,22 @@ def get_db_locale(self, txn: Cursor) -> Tuple[str, str]:

def check_database(
self,
db_conn: psycopg2.extensions.connection,
db_conn: ConnectionType,
allow_outdated_version: bool = False,
) -> None:
# Get the version of PostgreSQL that we're using. As per the psycopg2
# docs: The number is formed by converting the major, minor, and
# revision numbers into two-decimal-digit numbers and appending them
# together. For example, version 8.1.5 will be returned as 80105
self._version = db_conn.server_version
self._version = self.get_server_version(db_conn)
allow_unsafe_locale = self.config.get("allow_unsafe_locale", False)

# Are we on a supported PostgreSQL version?
if not allow_outdated_version and self._version < 110000:
raise RuntimeError("Synapse requires PostgreSQL 11 or above.")

with db_conn.cursor() as txn:
# psycopg and psycopg2 both support using cursors as context managers.
with db_conn.cursor() as txn: # type: ignore[attr-defined]
txn.execute("SHOW SERVER_ENCODING")
rows = txn.fetchall()
if rows and rows[0][0] != "UTF8":
Expand Down Expand Up @@ -173,7 +172,8 @@ def convert_param_style(self, sql: str) -> str:
return sql.replace("?", "%s")

def on_new_connection(self, db_conn: "LoggingDatabaseConnection") -> None:
db_conn.set_isolation_level(self.default_isolation_level)
# mypy doesn't realize that ConnectionType matches the Connection protocol.
self.attempt_to_set_isolation_level(db_conn.conn, self.default_isolation_level) # type: ignore[arg-type]

# Set the bytea output to escape, vs the default of hex
cursor = db_conn.cursor()
Expand Down Expand Up @@ -202,16 +202,9 @@ def supports_returning(self) -> bool:
"""Do we support the `RETURNING` clause in insert/update/delete?"""
return True

def is_deadlock(self, error: Exception) -> bool:
if isinstance(error, psycopg2.DatabaseError):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
# "40001" serialization_failure
# "40P01" deadlock_detected
return error.pgcode in ["40001", "40P01"]
return False

def is_connection_closed(self, conn: psycopg2.extensions.connection) -> bool:
return bool(conn.closed)
def is_connection_closed(self, conn: ConnectionType) -> bool:
# Both psycopg and psycopg2 connections have a closed attributed.
return bool(conn.closed) # type: ignore[attr-defined]

def lock_table(self, txn: Cursor, table: str) -> None:
txn.execute("LOCK TABLE %s in EXCLUSIVE MODE" % (table,))
Expand All @@ -234,25 +227,8 @@ def server_version(self) -> str:
def row_id_name(self) -> str:
return "ctid"

def in_transaction(self, conn: psycopg2.extensions.connection) -> bool:
return conn.status != psycopg2.extensions.STATUS_READY

def attempt_to_set_autocommit(
self, conn: psycopg2.extensions.connection, autocommit: bool
) -> None:
return conn.set_session(autocommit=autocommit)

def attempt_to_set_isolation_level(
self, conn: psycopg2.extensions.connection, isolation_level: Optional[int]
) -> None:
if isolation_level is None:
isolation_level = self.default_isolation_level
else:
isolation_level = self.isolation_level_map[isolation_level]
return conn.set_isolation_level(isolation_level)

@staticmethod
def executescript(cursor: psycopg2.extensions.cursor, script: str) -> None:
def executescript(cursor: CursorType, script: str) -> None:
"""Execute a chunk of SQL containing multiple semicolon-delimited statements.

Psycopg2 seems happy to do this in DBAPI2's `execute()` function.
Expand Down
96 changes: 96 additions & 0 deletions synapse/storage/engines/psycopg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2022-2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import Any, Mapping, Optional, Tuple

import psycopg
import psycopg.errors
import psycopg.sql

from twisted.enterprise.adbapi import Connection as TxConnection

from synapse.storage.engines import PostgresEngine
from synapse.storage.engines._base import IsolationLevel

logger = logging.getLogger(__name__)


class PsycopgEngine(
# mypy doesn't seem to like that the psycopg Connection and Cursor are Generics.
PostgresEngine[ # type: ignore[type-var]
psycopg.Connection[Tuple], psycopg.Cursor[Tuple], psycopg.IsolationLevel
]
):
def __init__(self, database_config: Mapping[str, Any]):
super().__init__(psycopg, database_config) # type: ignore[arg-type]
# psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)

# Disables passing `bytes` to txn.execute, c.f. #6186. If you do
# actually want to use bytes than wrap it in `bytearray`.
# def _disable_bytes_adapter(_: bytes) -> NoReturn:
# raise Exception("Passing bytes to DB is disabled.")

self.isolation_level_map = {
IsolationLevel.READ_COMMITTED: psycopg.IsolationLevel.READ_COMMITTED,
IsolationLevel.REPEATABLE_READ: psycopg.IsolationLevel.REPEATABLE_READ,
IsolationLevel.SERIALIZABLE: psycopg.IsolationLevel.SERIALIZABLE,
}
self.default_isolation_level = psycopg.IsolationLevel.REPEATABLE_READ

def get_server_version(self, db_conn: psycopg.Connection) -> int:
return db_conn.info.server_version

def set_statement_timeout(
self, cursor: psycopg.Cursor, statement_timeout: int
) -> None:
"""Configure the current cursor's statement timeout."""
cursor.execute(
psycopg.sql.SQL("SET statement_timeout TO {}").format(statement_timeout)
)

def convert_param_style(self, sql: str) -> str:
# if isinstance(sql, psycopg.sql.Composed):
# return sql

return sql.replace("?", "%s")

def is_deadlock(self, error: Exception) -> bool:
if isinstance(error, psycopg.errors.Error):
# https://www.postgresql.org/docs/current/static/errcodes-appendix.html
# "40001" serialization_failure
# "40P01" deadlock_detected
return error.sqlstate in ["40001", "40P01"]
return False

def in_transaction(self, conn: psycopg.Connection) -> bool:
return conn.info.transaction_status != psycopg.pq.TransactionStatus.IDLE

def attempt_to_set_autocommit(
self, conn: psycopg.Connection, autocommit: bool
) -> None:
# Sometimes this gets called with a Twisted connection instead, unwrap
# it because it doesn't support __setattr__.
if isinstance(conn, TxConnection):
conn = conn._connection
conn.autocommit = autocommit

def attempt_to_set_isolation_level(
self, conn: psycopg.Connection, isolation_level: Optional[int]
) -> None:
if isolation_level is None:
pg_isolation_level = self.default_isolation_level
else:
pg_isolation_level = self.isolation_level_map[isolation_level]
conn.isolation_level = pg_isolation_level
Loading