Skip to content

Commit f87e241

Browse files
agoscinskiGeigerJ2
andauthored
šŸ› aiida_computer: handle IntegrityError races (#7349)
Under `pytest-xdist`, the session-scoped `aiida_profile` is shared across workers. When one worker calls `aiida_profile_clean` it wipes the database mid-session, so subsequent calls to `aiida_computer` in other workers hit a TOCTOU race: `get()` finds no row, but `store()` raises `IntegrityError` because another worker already re-created the computer in the meantime. Band-aid fix for the UNIQUE-constraint failures reported in #7347: - Catch `IntegrityError` from `store()` and fall back to `get()` on the same four-field match (label, hostname, scheduler_type, transport_type), consistent with the original lookup. - If the fallback `get()` also raises `NotExistent` (rare: the racing worker's row already vanished), rebuild and store fresh. - Apply caller-provided `minimum_job_poll_interval` and `default_mpiprocs_per_machine` on every recovery path so the caller's values are never silently dropped. Add a regression test for each recovery branch. Co-authored-by: Julian Geiger <julian.geiger@psi.ch>
1 parent db1a28b commit f87e241

2 files changed

Lines changed: 93 additions & 8 deletions

File tree

ā€Žsrc/aiida/tools/pytest_fixtures/orm.pyā€Ž

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,24 +91,41 @@ def factory(
9191
) -> 'Computer':
9292
import uuid
9393

94-
from aiida.common.exceptions import NotExistent
94+
from aiida.common.exceptions import IntegrityError, NotExistent
9595
from aiida.orm import Computer
9696

9797
label = label or f'test-computer-{uuid.uuid4().hex}'
9898

99-
try:
100-
computer = Computer.collection.get(
101-
label=label, hostname=hostname, scheduler_type=scheduler_type, transport_type=transport_type
102-
)
103-
except NotExistent:
104-
computer = Computer(
99+
def _build() -> 'Computer':
100+
return Computer(
105101
label=label,
106102
hostname=hostname,
107103
workdir=str(tmp_path),
108104
transport_type=transport_type,
109105
scheduler_type=scheduler_type,
110106
)
111-
computer.store()
107+
108+
try:
109+
computer = Computer.collection.get(
110+
label=label, hostname=hostname, scheduler_type=scheduler_type, transport_type=transport_type
111+
)
112+
except NotExistent:
113+
computer = _build()
114+
try:
115+
computer.store()
116+
except IntegrityError:
117+
# Another xdist worker concurrently re-created this computer after
118+
# ``aiida_profile_clean`` reset the shared database. Re-querying is
119+
# safe because the ``psql_dos`` storage backend rolls back our failed
120+
# transaction before raising (see ``psql_dos/orm/utils.py``).
121+
try:
122+
computer = Computer.collection.get(
123+
label=label, hostname=hostname, scheduler_type=scheduler_type, transport_type=transport_type
124+
)
125+
except NotExistent:
126+
# Row no longer present (rare): rebuild and store fresh.
127+
computer = _build()
128+
computer.store()
112129
computer.set_minimum_job_poll_interval(minimum_job_poll_interval)
113130
computer.set_default_mpiprocs_per_machine(default_mpiprocs_per_machine)
114131

ā€Žtests/tools/pytest_fixtures/test_orm.pyā€Ž

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""Tests for the :mod:`aiida.tools.pytest_fixtures.orm` module."""
22

33
import uuid
4+
from unittest.mock import patch
45

56
import pytest
67

8+
from aiida.common.exceptions import IntegrityError, NotExistent
79
from aiida.orm import Computer
810
from aiida.transports import AsyncTransport, BlockingTransport
911
from aiida.transports.plugins.async_backend import _AsyncSSH, _OpenSSH
@@ -50,6 +52,72 @@ def test_aiida_computer_fixtures(fixture_name, transport_cls, transport_type, re
5052
assert not computer_unconfigured.is_configured
5153

5254

55+
@pytest.mark.usefixtures('aiida_profile_clean')
56+
def test_aiida_computer_integrity_error_fallback(aiida_computer):
57+
"""The fallback ``get()`` returns the racing worker's row.
58+
59+
Simulates the xdist race where a row exists in the shared DB but our session
60+
can't see it on the first ``get()`` (as if ``aiida_profile_clean`` just wiped
61+
things from this worker's POV); ``store()`` then fails with ``IntegrityError``
62+
on the UNIQUE label constraint, and the fallback ``get()`` finds the row.
63+
"""
64+
label = f'test-fallback-{uuid.uuid4().hex}'
65+
existing = aiida_computer(label=label)
66+
67+
with (
68+
patch.object(
69+
type(Computer.collection),
70+
'get',
71+
side_effect=[NotExistent('First get: DB was just cleaned'), existing],
72+
),
73+
patch.object(Computer, 'store', side_effect=IntegrityError('UNIQUE constraint failed')),
74+
):
75+
recovered = aiida_computer(label=label)
76+
77+
assert recovered.pk == existing.pk
78+
assert recovered.uuid == existing.uuid
79+
80+
81+
@pytest.mark.usefixtures('aiida_profile_clean')
82+
def test_aiida_computer_integrity_error_rebuild(aiida_computer):
83+
"""The fallback ``get()`` also misses, so the fixture rebuilds and stores fresh.
84+
85+
Rare follow-up to the fallback case: ``store()`` raised ``IntegrityError`` (so
86+
the racing worker's row should be there), but by the time we re-query it has
87+
already vanished. The fixture must then build a new ``Computer`` and store it.
88+
"""
89+
label = f'test-rebuild-{uuid.uuid4().hex}'
90+
91+
# store() fails the first time (simulating the race) and persists for real on
92+
# the second call (the rebuild). ``autospec=True`` is required so the mock
93+
# passes ``self`` to ``store_side_effect``, which we need to call through.
94+
original_store = Computer.store
95+
store_calls = {'n': 0}
96+
97+
def store_side_effect(self):
98+
store_calls['n'] += 1
99+
if store_calls['n'] == 1:
100+
raise IntegrityError('UNIQUE constraint failed')
101+
return original_store(self)
102+
103+
with (
104+
patch.object(
105+
type(Computer.collection),
106+
'get',
107+
side_effect=[
108+
NotExistent('First get: DB was just cleaned'),
109+
NotExistent('Second get: racing worker row vanished'),
110+
],
111+
),
112+
patch.object(Computer, 'store', autospec=True, side_effect=store_side_effect),
113+
):
114+
recovered = aiida_computer(label=label)
115+
116+
assert recovered.is_stored
117+
assert recovered.label == label
118+
assert store_calls['n'] == 2
119+
120+
53121
@pytest.mark.parametrize(
54122
'backend, backend_class',
55123
[

0 commit comments

Comments
Ā (0)
⚔