Skip to content

Commit 4ba4a90

Browse files
committed
Align history-audit trigger SQL between migration and runtime installer
The follow-up pg_notify migration b8d5e2f9a1c7 and runtime installer update_audit_table.py carried two independent copies of the plpgsql trigger function body, and disagreed on the STATEMENT-vs-ROW threshold: - c716ee82337b (last trigger migration) used `version > 10`, so on PG 10 it installed a ROW trigger + ROW-variant function. - b8d5e2f9a1c7 used `>= 10`, so on PG 10 its upgrade() overwrote the function body with the STATEMENT variant (references `new_table`) while the trigger left behind by c716ee82337b was still ROW — the next audit write would raise "missing FROM-clause entry for new_table". - update_audit_table.py (fresh install) used `>= 10`, which is internally consistent for fresh installs but diverges from the upgrade path. Extract `build_trigger_fn(function_name, id_field, use_statement, with_notify)` and `use_statement_trigger(version)` from update_audit_table.py, and have the migration import them. Now both paths share one SQL body and one predicate, and the predicate matches c716ee82337b so upgraded PG 10 databases keep a consistent ROW trigger + ROW-variant function. (PG 10 is long past EOL, but drift between runtime install and migration is exactly the class of bug this migration was written to repair.)
1 parent 4bddc2e commit 4ba4a90

2 files changed

Lines changed: 83 additions & 107 deletions

File tree

lib/galaxy/model/migrations/alembic/versions_gxy/b8d5e2f9a1c7_add_pg_notify_to_history_audit_triggers.py

Lines changed: 14 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
affected history id, matching `model/triggers/update_audit_table.py` used on
1515
fresh installs.
1616
17+
The STATEMENT-vs-ROW decision must match the trigger DEFINITION installed by
18+
`c716ee82337b` so the function body references the right context (`new_table`
19+
vs `NEW`); both use `version > 10` (and treat offline mode as STATEMENT).
20+
1721
SQLite installations use the poll-only path and require no change.
1822
"""
1923

@@ -23,16 +27,18 @@
2327
_is_sqlite,
2428
transaction,
2529
)
30+
from galaxy.model.triggers.update_audit_table import (
31+
build_trigger_fn,
32+
fn_prefix,
33+
use_statement_trigger,
34+
)
2635

2736
revision = "b8d5e2f9a1c7"
2837
down_revision = "f5e9e4bca542"
2938
branch_labels = None
3039
depends_on = None
3140

3241

33-
CHANNEL = "galaxy_history_update"
34-
35-
3642
def upgrade():
3743
if _is_sqlite():
3844
return
@@ -47,58 +53,10 @@ def downgrade():
4753
_install_functions(with_notify=False)
4854

4955

50-
def _install_functions(with_notify: bool):
56+
def _install_functions(with_notify: bool) -> None:
5157
version_info = op.get_bind().engine.dialect.server_version_info
52-
use_statement = version_info is None or version_info[0] >= 10
53-
builder = _statement_trigger_fn if use_statement else _row_trigger_fn
58+
# Offline mode (no live connection) matches c716ee82337b: assume STATEMENT.
59+
statement = version_info is None or use_statement_trigger(version_info[0])
5460
for id_field in ("history_id", "id"):
55-
op.execute(builder(f"fn_audit_history_by_{id_field}", id_field, with_notify))
56-
57-
58-
def _statement_trigger_fn(function_name: str, id_field: str, with_notify: bool) -> str:
59-
notify_block = (
60-
f"""
61-
FOR _history_id IN SELECT DISTINCT {id_field} FROM new_table WHERE {id_field} IS NOT NULL
62-
LOOP
63-
PERFORM pg_notify('{CHANNEL}', _history_id::text);
64-
END LOOP;
65-
"""
66-
if with_notify
67-
else ""
68-
)
69-
declare_block = "DECLARE _history_id integer;" if with_notify else ""
70-
return f"""
71-
CREATE OR REPLACE FUNCTION {function_name}()
72-
RETURNS TRIGGER
73-
LANGUAGE 'plpgsql'
74-
AS $BODY$
75-
{declare_block}
76-
BEGIN
77-
INSERT INTO history_audit (history_id, update_time)
78-
SELECT DISTINCT {id_field}, clock_timestamp() AT TIME ZONE 'UTC'
79-
FROM new_table
80-
WHERE {id_field} IS NOT NULL
81-
ON CONFLICT DO NOTHING;
82-
{notify_block}
83-
RETURN NULL;
84-
END;
85-
$BODY$
86-
"""
87-
88-
89-
def _row_trigger_fn(function_name: str, id_field: str, with_notify: bool) -> str:
90-
notify_stmt = f"PERFORM pg_notify('{CHANNEL}', NEW.{id_field}::text);" if with_notify else ""
91-
return f"""
92-
CREATE OR REPLACE FUNCTION {function_name}()
93-
RETURNS TRIGGER
94-
LANGUAGE 'plpgsql'
95-
AS $BODY$
96-
BEGIN
97-
INSERT INTO history_audit (history_id, update_time)
98-
VALUES (NEW.{id_field}, clock_timestamp() AT TIME ZONE 'UTC')
99-
ON CONFLICT DO NOTHING;
100-
{notify_stmt}
101-
RETURN NULL;
102-
END;
103-
$BODY$
104-
"""
61+
fn_name = f"{fn_prefix}_{id_field}"
62+
op.execute(build_trigger_fn(fn_name, id_field, use_statement=statement, with_notify=with_notify))

lib/galaxy/model/triggers/update_audit_table.py

Lines changed: 69 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
# function name prefix
44
fn_prefix = "fn_audit_history_by"
55

6+
# channel used by pg_notify so HistoryAuditMonitor can LISTEN for updates
7+
NOTIFY_CHANNEL = "galaxy_history_update"
8+
69
# map between source table and associated incoming id field
710
trigger_config = {
811
"history_dataset_association": "history_id",
@@ -11,6 +14,68 @@
1114
}
1215

1316

17+
def use_statement_trigger(version: int) -> bool:
18+
"""Return True when the postgres version supports the STATEMENT variant.
19+
20+
Fresh installs and the pg_notify migration share this predicate to ensure
21+
the trigger function body (STATEMENT references new_table, ROW references NEW)
22+
matches the trigger definition installed at that version.
23+
"""
24+
return version > 10
25+
26+
27+
def build_trigger_fn(function_name: str, id_field: str, *, use_statement: bool, with_notify: bool = True) -> str:
28+
"""Build the plpgsql CREATE OR REPLACE FUNCTION body for an audit trigger.
29+
30+
Shared between runtime install (update_audit_table.install) and alembic
31+
migrations so the two cannot drift.
32+
"""
33+
if use_statement:
34+
notify_block = (
35+
f"""
36+
FOR _history_id IN SELECT DISTINCT {id_field} FROM new_table WHERE {id_field} IS NOT NULL
37+
LOOP
38+
PERFORM pg_notify('{NOTIFY_CHANNEL}', _history_id::text);
39+
END LOOP;
40+
"""
41+
if with_notify
42+
else ""
43+
)
44+
declare_block = "DECLARE _history_id integer;" if with_notify else ""
45+
return f"""
46+
CREATE OR REPLACE FUNCTION {function_name}()
47+
RETURNS TRIGGER
48+
LANGUAGE 'plpgsql'
49+
AS $BODY$
50+
{declare_block}
51+
BEGIN
52+
INSERT INTO history_audit (history_id, update_time)
53+
SELECT DISTINCT {id_field}, clock_timestamp() AT TIME ZONE 'UTC'
54+
FROM new_table
55+
WHERE {id_field} IS NOT NULL
56+
ON CONFLICT DO NOTHING;
57+
{notify_block}
58+
RETURN NULL;
59+
END;
60+
$BODY$
61+
"""
62+
notify_stmt = f"PERFORM pg_notify('{NOTIFY_CHANNEL}', NEW.{id_field}::text);" if with_notify else ""
63+
return f"""
64+
CREATE OR REPLACE FUNCTION {function_name}()
65+
RETURNS TRIGGER
66+
LANGUAGE 'plpgsql'
67+
AS $BODY$
68+
BEGIN
69+
INSERT INTO history_audit (history_id, update_time)
70+
VALUES (NEW.{id_field}, clock_timestamp() AT TIME ZONE 'UTC')
71+
ON CONFLICT DO NOTHING;
72+
{notify_stmt}
73+
RETURN NULL;
74+
END;
75+
$BODY$
76+
"""
77+
78+
1479
def install(engine):
1580
"""Install history audit table triggers"""
1681
sql = _postgres_install(engine) if "postgres" in engine.name else _sqlite_install()
@@ -41,54 +106,6 @@ def _postgres_install(engine):
41106

42107
sql = []
43108

44-
# PostgreSQL trigger function template
45-
# need to make separate functions purely because the incoming history_id field name will be
46-
# different for different source tables. There may be a fancier way to dynamically choose
47-
# between incoming fields, but having 2 triggers fns seems straightforward
48-
49-
def statement_trigger_fn(id_field):
50-
fn = f"{fn_prefix}_{id_field}"
51-
52-
return f"""
53-
CREATE OR REPLACE FUNCTION {fn}()
54-
RETURNS TRIGGER
55-
LANGUAGE 'plpgsql'
56-
AS $BODY$
57-
DECLARE
58-
_history_id integer;
59-
BEGIN
60-
INSERT INTO history_audit (history_id, update_time)
61-
SELECT DISTINCT {id_field}, clock_timestamp() AT TIME ZONE 'UTC'
62-
FROM new_table
63-
WHERE {id_field} IS NOT NULL
64-
ON CONFLICT DO NOTHING;
65-
FOR _history_id IN SELECT DISTINCT {id_field} FROM new_table WHERE {id_field} IS NOT NULL
66-
LOOP
67-
PERFORM pg_notify('galaxy_history_update', _history_id::text);
68-
END LOOP;
69-
RETURN NULL;
70-
END;
71-
$BODY$
72-
"""
73-
74-
def row_trigger_fn(id_field):
75-
fn = f"{fn_prefix}_{id_field}"
76-
77-
return f"""
78-
CREATE OR REPLACE FUNCTION {fn}()
79-
RETURNS TRIGGER
80-
LANGUAGE 'plpgsql'
81-
AS $BODY$
82-
BEGIN
83-
INSERT INTO history_audit (history_id, update_time)
84-
VALUES (NEW.{id_field}, clock_timestamp() AT TIME ZONE 'UTC')
85-
ON CONFLICT DO NOTHING;
86-
PERFORM pg_notify('galaxy_history_update', NEW.{id_field}::text);
87-
RETURN NULL;
88-
END;
89-
$BODY$
90-
"""
91-
92109
def trigger_def(source_table: str, id_field: str, operation: str, version: int, when: str = "AFTER") -> str:
93110
fn = f"{fn_prefix}_{id_field}"
94111
# PostgreSQL supports many triggers per operation/table so the label can
@@ -100,7 +117,7 @@ def trigger_def(source_table: str, id_field: str, operation: str, version: int,
100117
# The use of the keyword PROCEDURE here is historical and deprecated (https://www.postgresql.org/docs/11/sql-createtrigger.html).
101118
function_keyword = "FUNCTION" if version >= 11 else "PROCEDURE"
102119
create_or_replace = "CREATE OR REPLACE" if version >= 14 else "CREATE"
103-
if version >= 10 and when == "AFTER":
120+
if use_statement_trigger(version) and when == "AFTER":
104121
return f"""
105122
{create_or_replace} TRIGGER {trigger_name}
106123
AFTER {operation}
@@ -121,10 +138,11 @@ def trigger_def(source_table: str, id_field: str, operation: str, version: int,
121138

122139
# pick row or statement triggers depending on postgres version
123140
version = engine.dialect.server_version_info[0]
124-
trigger_fn = statement_trigger_fn if version >= 10 else row_trigger_fn
141+
statement = use_statement_trigger(version)
125142

126143
for id_field in ["history_id", "id"]:
127-
sql.append(trigger_fn(id_field))
144+
fn_name = f"{fn_prefix}_{id_field}"
145+
sql.append(build_trigger_fn(fn_name, id_field, use_statement=statement, with_notify=True))
128146

129147
for source_table, id_field in trigger_config.items():
130148
for operation in ["UPDATE", "INSERT"]:

0 commit comments

Comments
 (0)