@@ -69,7 +69,7 @@ WITH job_version_io AS (
6969 JSON_AGG(json_build_object('namespace', ds.namespace_name,
7070 'name', ds.name))
7171 FILTER (WHERE io.io_type = 'OUTPUT') AS output_datasets
72- FROM job_versions_io_mapping io
72+ FROM job_io_mapping io
7373 INNER JOIN job_versions jv ON jv.uuid = io.job_version_uuid
7474 INNER JOIN datasets_view ds ON ds.uuid = io.dataset_uuid
7575 INNER JOIN jobs_view j ON j.uuid=jv.job_uuid
@@ -192,40 +192,73 @@ ExtendedJobVersionRow upsertJobVersion(
192192 String namespaceName );
193193
194194 /**
195- * Used to link an input dataset to a given job version.
195+ * Used to upsert an input or output dataset to a given job version.
196196 *
197197 * @param jobVersionUuid The unique ID of the job version.
198- * @param inputDatasetUuid The unique ID of the input dataset.
198+ * @param datasetUuid The unique ID of the output dataset
199+ * @param ioType The {@link IoType} of the dataset.
200+ * @param jobUuid The unique ID of the job.
199201 */
200- default void upsertInputDatasetFor (UUID jobVersionUuid , UUID inputDatasetUuid ) {
201- upsertInputOrOutputDatasetFor (jobVersionUuid , inputDatasetUuid , IoType .INPUT );
202- }
202+ @ SqlUpdate (
203+ """
204+ INSERT INTO job_io_mapping (
205+ job_version_uuid, dataset_uuid, io_type, job_uuid, symlink_target_job_uuid, is_job_version_current)
206+ VALUES (:jobVersionUuid, :datasetUuid, :ioType, :jobUuid, :symlinkTargetJobUuid, TRUE)
207+ ON CONFLICT (job_version_uuid, dataset_uuid, io_type, job_uuid) DO UPDATE SET is_job_version_current = TRUE
208+ """ )
209+ void upsertCurrentInputOrOutputDatasetFor (
210+ UUID jobVersionUuid ,
211+ UUID datasetUuid ,
212+ UUID jobUuid ,
213+ UUID symlinkTargetJobUuid ,
214+ IoType ioType );
215+
216+ @ SqlUpdate (
217+ """
218+ UPDATE job_io_mapping
219+ SET is_job_version_current = FALSE
220+ WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
221+ AND job_version_uuid != :jobVersionUuid
222+ AND io_type = :ioType
223+ AND is_job_version_current = TRUE;
224+ """ )
225+ void markVersionIOMappingObsolete (UUID jobVersionUuid , UUID jobUuid , IoType ioType );
226+
227+ @ SqlUpdate (
228+ """
229+ UPDATE job_io_mapping
230+ SET is_job_version_current = FALSE
231+ WHERE (job_uuid = :jobUuid OR symlink_target_job_uuid = :jobUuid)
232+ AND io_type = :ioType
233+ AND is_job_version_current = TRUE;
234+ """ )
235+ void markVersionIOMappingObsolete (UUID jobUuid , IoType ioType );
203236
204237 /**
205- * Used to link an output dataset to a given job version.
238+ * Used to link an input dataset to a given job version.
206239 *
207- * @param jobVersionUuid The unique ID of the job version .
208- * @param outputDatasetUuid The unique ID of the output dataset .
240+ * @param inputDatasetUuid The unique ID of the input dataset .
241+ * @param jobUuid The unique ID of the job .
209242 */
210- default void upsertOutputDatasetFor (UUID jobVersionUuid , UUID outputDatasetUuid ) {
211- upsertInputOrOutputDatasetFor (jobVersionUuid , outputDatasetUuid , IoType .OUTPUT );
243+ default void upsertInputDatasetFor (
244+ UUID jobVersionUuid , UUID inputDatasetUuid , UUID jobUuid , UUID symlinkTargetJobUuid ) {
245+ markVersionIOMappingObsolete (jobVersionUuid , jobUuid , IoType .INPUT );
246+ upsertCurrentInputOrOutputDatasetFor (
247+ jobVersionUuid , inputDatasetUuid , jobUuid , symlinkTargetJobUuid , IoType .INPUT );
212248 }
213249
214250 /**
215- * Used to upsert an input or output dataset to a given job version.
251+ * Used to link an output dataset to a given job version.
216252 *
217- * @param jobVersionUuid The unique ID of the job version.
218- * @param datasetUuid The unique ID of the output dataset
219- * @param ioType The {@link IoType} of the dataset.
253+ * @param outputDatasetUuid The unique ID of the output dataset.
254+ * @param jobUuid The unique ID of the job.
220255 */
221- @ SqlUpdate (
222- """
223- INSERT INTO job_versions_io_mapping (
224- job_version_uuid, dataset_uuid, io_type)
225- VALUES (:jobVersionUuid, :datasetUuid, :ioType)
226- ON CONFLICT DO NOTHING
227- """ )
228- void upsertInputOrOutputDatasetFor (UUID jobVersionUuid , UUID datasetUuid , IoType ioType );
256+ default void upsertOutputDatasetFor (
257+ UUID jobVersionUuid , UUID outputDatasetUuid , UUID jobUuid , UUID symlinkTargetJobUuid ) {
258+ markVersionIOMappingObsolete (jobVersionUuid , jobUuid , IoType .OUTPUT );
259+ upsertCurrentInputOrOutputDatasetFor (
260+ jobVersionUuid , outputDatasetUuid , jobUuid , symlinkTargetJobUuid , IoType .OUTPUT );
261+ }
229262
230263 /**
231264 * Returns the input datasets to a given job version.
@@ -256,7 +289,7 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
256289 @ SqlQuery (
257290 """
258291 SELECT dataset_uuid
259- FROM job_versions_io_mapping
292+ FROM job_io_mapping
260293 WHERE job_version_uuid = :jobVersionUuid
261294 AND io_type = :ioType
262295 """ )
@@ -265,7 +298,7 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
265298 @ SqlQuery (
266299 """
267300 SELECT d.namespace_name, d.name, io.io_type
268- FROM job_versions_io_mapping io
301+ FROM job_io_mapping io
269302 INNER JOIN jobs_view j ON j.current_version_uuid = io.job_version_uuid
270303 INNER JOIN datasets_view d on d.uuid = io.dataset_uuid
271304 WHERE j.name = :jobName AND j.namespace_name=:jobNamespace
@@ -366,14 +399,20 @@ default BagOfJobVersionInfo upsertRunlessJobVersion(
366399 inputs .forEach (
367400 i -> {
368401 jobVersionDao .upsertInputDatasetFor (
369- jobVersionRow .getUuid (), i .getDatasetVersionRow ().getDatasetUuid ());
402+ jobVersionRow .getUuid (),
403+ i .getDatasetVersionRow ().getDatasetUuid (),
404+ jobVersionRow .getJobUuid (),
405+ jobRow .getSymlinkTargetId ());
370406 });
371407
372408 // Link the output datasets to the job version.
373409 outputs .forEach (
374410 o -> {
375411 jobVersionDao .upsertOutputDatasetFor (
376- jobVersionRow .getUuid (), o .getDatasetVersionRow ().getDatasetUuid ());
412+ jobVersionRow .getUuid (),
413+ o .getDatasetVersionRow ().getDatasetUuid (),
414+ jobVersionRow .getJobUuid (),
415+ jobRow .getSymlinkTargetId ());
377416 });
378417
379418 jobDao .updateVersionFor (jobRow .getUuid (), jobRow .getCreatedAt (), jobVersionRow .getUuid ());
@@ -468,14 +507,20 @@ default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
468507 jobVersionInputs .forEach (
469508 jobVersionInput -> {
470509 jobVersionDao .upsertInputDatasetFor (
471- jobVersionRow .getUuid (), jobVersionInput .getDatasetUuid ());
510+ jobVersionRow .getUuid (),
511+ jobVersionInput .getDatasetUuid (),
512+ jobVersionRow .getJobUuid (),
513+ jobRow .getSymlinkTargetId ());
472514 });
473515
474516 // Link the output datasets to the job version.
475517 jobVersionOutputs .forEach (
476518 jobVersionOutput -> {
477519 jobVersionDao .upsertOutputDatasetFor (
478- jobVersionRow .getUuid (), jobVersionOutput .getDatasetUuid ());
520+ jobVersionRow .getUuid (),
521+ jobVersionOutput .getDatasetUuid (),
522+ jobVersionRow .getJobUuid (),
523+ jobRow .getSymlinkTargetId ());
479524 });
480525
481526 // Link the job version to the run.
0 commit comments