Skip to content

Commit 4580b48

Browse files
davelopezCopilot
andcommitted
Ensure safe rollback on checksum failure during dataset move
Adds cleanup of partially transferred files in the target store when a checksum verification error or other exception occurs during cross-device dataset transfer. Updates tests to verify that failed transfers do not result in data loss and are safely rerunnable. Co-authored-by: Copilot <copilot@github.com>
1 parent cdd0ab7 commit 4580b48

2 files changed

Lines changed: 167 additions & 2 deletions

File tree

lib/galaxy/managers/dataset_storage_operations.py

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -993,6 +993,8 @@ def _record_ineligible(
993993

994994
def _execute_dataset_transfer(self, dataset: Dataset, dataset_id: int, quota_delta: int):
995995
source_proxy = self._dataset_proxy(dataset, str(dataset.object_store_id))
996+
target_proxy: Optional[DatasetObjectStoreProxy] = None
997+
extra_files_path_name = dataset.extra_files_path_name
996998
if not self._source_dataset_exists(source_proxy):
997999
self._record_transfer_failure(
9981000
dataset_id,
@@ -1004,10 +1006,11 @@ def _execute_dataset_transfer(self, dataset: Dataset, dataset_id: int, quota_del
10041006
try:
10051007
bytes_processed = 0
10061008
if self.storage_operation_manager.requires_data_transfer(dataset, self.run.target_object_store_id):
1009+
target_proxy = self._dataset_proxy(dataset, self.run.target_object_store_id)
10071010
bytes_processed = self._copy_dataset_to_target_store(dataset, self.run.target_object_store_id)
10081011
self._verify_copied_dataset_integrity(dataset, self.run.target_object_store_id)
10091012
self._finalize_cross_device_move(dataset, self.run.target_object_store_id)
1010-
self._cleanup_source_dataset_data(source_proxy, dataset.extra_files_path_name)
1013+
self._cleanup_source_dataset_data(source_proxy, extra_files_path_name)
10111014
else:
10121015
self.dataset_manager.update_object_store_id(self.trans, dataset, self.run.target_object_store_id)
10131016

@@ -1017,13 +1020,17 @@ def _execute_dataset_transfer(self, dataset: Dataset, dataset_id: int, quota_del
10171020
self._notify_dataset_update(dataset)
10181021
except ChecksumVerificationError as exc:
10191022
log.warning("Integrity verification failed for run %s dataset %s: %s", self.run.id, dataset.id, exc)
1023+
if target_proxy is not None:
1024+
self._cleanup_target_dataset_data(target_proxy, extra_files_path_name)
10201025
self._record_transfer_failure(
10211026
dataset_id,
10221027
DatasetStorageOperationFailureReasonCode.checksum_verification_failed,
10231028
self._CHECKSUM_FAILURE_MESSAGE,
10241029
)
10251030
except Exception:
10261031
log.exception("Storage operation execution error for run %s dataset %s", self.run.id, dataset.id)
1032+
if target_proxy is not None:
1033+
self._cleanup_target_dataset_data(target_proxy, extra_files_path_name)
10271034
self._record_transfer_failure(
10281035
dataset_id,
10291036
DatasetStorageOperationFailureReasonCode.execution_error,
@@ -1224,6 +1231,40 @@ def _cleanup_source_dataset_data(
12241231
exc_info=True,
12251232
)
12261233

1234+
def _cleanup_target_dataset_data(
1235+
self,
1236+
target_proxy: DatasetObjectStoreProxy,
1237+
extra_files_path_name: Optional[str],
1238+
) -> None:
1239+
try:
1240+
self.app.object_store.delete(target_proxy)
1241+
except Exception:
1242+
log.warning(
1243+
"Failed to delete target dataset file while rolling back failed storage move for run %s dataset %s",
1244+
self.run.id,
1245+
target_proxy.id,
1246+
exc_info=True,
1247+
)
1248+
1249+
if not extra_files_path_name:
1250+
return
1251+
1252+
try:
1253+
if self.app.object_store.exists(target_proxy, dir_only=True, extra_dir=extra_files_path_name):
1254+
self.app.object_store.delete(
1255+
target_proxy,
1256+
entire_dir=True,
1257+
extra_dir=extra_files_path_name,
1258+
dir_only=True,
1259+
)
1260+
except Exception:
1261+
log.warning(
1262+
"Failed to delete target extra files while rolling back failed storage move for run %s dataset %s",
1263+
self.run.id,
1264+
target_proxy.id,
1265+
exc_info=True,
1266+
)
1267+
12271268
def _finalize_cross_device_move(self, dataset: Dataset, target_object_store_id: str):
12281269
old_object_store_id = dataset.object_store_id
12291270
quota_source_map = self.app.object_store.get_quota_source_map()

test/integration/objectstore/test_bulk_storage_operations.py

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,22 @@
2525

2626
import os
2727
import string
28-
from typing import Any
28+
from typing import (
29+
Any,
30+
cast,
31+
)
32+
from unittest.mock import patch
2933

34+
from galaxy.managers.dataset_storage_operations import (
35+
DatasetStorageOperationManager,
36+
StorageOperationRunExecutor,
37+
)
38+
from galaxy.managers.datasets import DatasetManager
39+
from galaxy.model import (
40+
DatasetStorageOperationSnapshot,
41+
User,
42+
)
43+
from galaxy.model.scoped_session import galaxy_scoped_session
3044
from galaxy_test.base.decorators import requires_celery
3145
from galaxy_test.base.populators import (
3246
DatasetCollectionPopulator,
@@ -160,6 +174,45 @@ def _store_file_counts(self) -> tuple[int, int]:
160174
separate_path = os.path.join(self.object_stores_parent, "files_separate")
161175
return files_count(default_path), files_count(separate_path)
162176

177+
def _execute_snapshot_sync(
178+
self,
179+
sa_session: galaxy_scoped_session,
180+
snapshot: DatasetStorageOperationSnapshot,
181+
*,
182+
skip_ineligible: bool,
183+
force_checksum_mismatch: bool = False,
184+
) -> str:
185+
storage_operation_manager = DatasetStorageOperationManager(self._app.object_store)
186+
dataset_manager = DatasetManager(self._app)
187+
user = sa_session.get(User, snapshot.user_id)
188+
189+
run, _ = storage_operation_manager.create_run_and_summary(
190+
sa_session=sa_session,
191+
snapshot=snapshot,
192+
skip_ineligible=skip_ineligible,
193+
)
194+
executor = storage_operation_manager.create_run_executor(
195+
sa_session=sa_session,
196+
dataset_manager=dataset_manager,
197+
app=self._app,
198+
run=run,
199+
user=user,
200+
)
201+
202+
if force_checksum_mismatch:
203+
# Simulate corruption by forcing source/target checksums to differ during verify step.
204+
with patch.object(
205+
StorageOperationRunExecutor,
206+
"_sha256",
207+
autospec=True,
208+
side_effect=lambda _self, path: "source-hash" if "files_default" in path else "target-hash",
209+
):
210+
executor.execute_run(snapshot)
211+
else:
212+
executor.execute_run(snapshot)
213+
214+
return self._app.security.encode_id(run.id)
215+
163216
def _item(self, hda_id: str) -> dict[str, Any]:
164217
return {"id": hda_id, "history_content_type": "dataset"}
165218

@@ -683,3 +736,74 @@ def test_idempotent_reexecution_mixed_state_no_data_mutation(self):
683736
"becomes-ineligible\n",
684737
)
685738
self._assert_dataset_store_and_content(history_id, control["id"], DEFAULT_OBJECT_STORE_ID, "control\n")
739+
740+
def test_cross_device_checksum_mismatch_is_safe_and_rerunnable(self):
741+
"""Checksum mismatch fails safely (no data loss) and the same snapshot can be re-run successfully."""
742+
with self.dataset_populator.test_history() as history_id:
743+
hda = self.dataset_populator.new_dataset(history_id, content="checksum-guard", wait=True)
744+
745+
preview = self._preview_move(
746+
history_id,
747+
SEPARATE_DEVICE_OBJECT_STORE_ID,
748+
[self._item(hda["id"])],
749+
)
750+
self._assert_eligibility(preview, eligible=1, ineligible=0)
751+
baseline_default_count, baseline_separate_count = self._store_file_counts()
752+
753+
# Force checksum mismatch during execution to simulate corruption or other transfer failure.
754+
# This uses a synchronous execution of the snapshot to ensure the mismatch occurs in the first run
755+
# and allows re-running the same snapshot without needing to wait for a real async run to complete.
756+
sa_session = cast(galaxy_scoped_session, self._app.model.session)
757+
snapshot_id = self._app.security.decode_id(preview["snapshot_id"])
758+
snapshot = sa_session.get(DatasetStorageOperationSnapshot, snapshot_id)
759+
assert snapshot is not None
760+
first_run_id = self._execute_snapshot_sync(
761+
sa_session,
762+
snapshot,
763+
skip_ineligible=False,
764+
force_checksum_mismatch=True,
765+
)
766+
first_run_status = self.dataset_populator.storage_run_status(history_id, first_run_id)
767+
self._assert_run_counts(first_run_status["run"], succeeded=0, failed=1, skipped=0)
768+
769+
first_run_items = self._run_items(history_id, first_run_id)
770+
assert len(first_run_items) == 1
771+
assert first_run_items[0]["dataset_id"] == hda["id"]
772+
assert first_run_items[0]["state"] == "failed"
773+
assert first_run_items[0]["reason_code"] == "checksum_verification_failed"
774+
775+
# Failed transfer should rollback target writes (no leftover files).
776+
failed_default_count, failed_separate_count = self._store_file_counts()
777+
assert failed_default_count == baseline_default_count
778+
assert failed_separate_count == baseline_separate_count
779+
780+
# Failure path must keep data readable from source store (no data loss).
781+
self._assert_dataset_store_and_content(
782+
history_id,
783+
hda["id"],
784+
DEFAULT_OBJECT_STORE_ID,
785+
"checksum-guard\n",
786+
)
787+
788+
# Re-run same snapshot without forced corruption; move should now succeed.
789+
second_run_id = self._execute_snapshot_sync(sa_session, snapshot, skip_ineligible=False)
790+
second_run_status = self.dataset_populator.storage_run_status(history_id, second_run_id)
791+
self._assert_run_counts(second_run_status["run"], succeeded=1, failed=0, skipped=0)
792+
793+
second_run_items = self._run_items(history_id, second_run_id)
794+
assert len(second_run_items) == 1
795+
assert second_run_items[0]["dataset_id"] == hda["id"]
796+
assert second_run_items[0]["state"] == "succeeded"
797+
assert second_run_items[0]["reason_code"] is None
798+
assert second_run_items[0]["bytes_processed"] > 0
799+
800+
succeeded_default_count, succeeded_separate_count = self._store_file_counts()
801+
assert succeeded_default_count == baseline_default_count - 1
802+
assert succeeded_separate_count == baseline_separate_count + 1
803+
804+
self._assert_dataset_store_and_content(
805+
history_id,
806+
hda["id"],
807+
SEPARATE_DEVICE_OBJECT_STORE_ID,
808+
"checksum-guard\n",
809+
)

0 commit comments

Comments
 (0)