Skip to content

Commit 2e64bef

Browse files
authored
Add labels to subworkflows during Carbonite thawing [BA-5875] (broadinstitute#5223)
1 parent f206bee commit 2e64bef

22 files changed

Lines changed: 2380 additions & 43 deletions

File tree

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ lazy val services = project
164164
.dependsOn(ftpFileSystem % "test->test")
165165

166166
lazy val hybridCarboniteMetadataService = project
167-
.withLibrarySettings("hybrid-carbonite-metadata-service")
167+
.withLibrarySettings("hybrid-carbonite-metadata-service", hybridCarboniteMetadataServiceDependencies)
168168
.dependsOn(services)
169169
.dependsOn(engine)
170170
.dependsOn(core % "test->test")
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Used to generate JSON for testing label addition to sub workflows.
2+
name: sub_sub
3+
testFormat: workflowsuccess
4+
tags: [subworkflow]
5+
ignore: true
6+
7+
files {
8+
workflow: sub_sub/top.wdl
9+
imports: [
10+
sub_sub/sub.wdl,
11+
sub_sub/sub_sub.wdl,
12+
sub_sub/sub_sub_sub.wdl
13+
]
14+
}
15+
16+
metadata {
17+
status: Succeeded
18+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import "sub_sub.wdl" as sub_sub
2+
3+
workflow wf {
4+
scatter (i in range(2)) {
5+
call sub_sub.wf
6+
}
7+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import "sub_sub_sub.wdl" as sub_sub_sub
2+
3+
workflow wf {
4+
scatter (i in range(2)) {
5+
call sub_sub_sub.wf
6+
}
7+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
2+
task foo {
3+
command {
4+
echo "foo"
5+
}
6+
}
7+
8+
workflow wf {
9+
scatter (i in range(2)) {
10+
call foo
11+
}
12+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
import "sub.wdl" as sub
2+
3+
workflow wf {
4+
scatter (i in range(2)) {
5+
call sub.wf
6+
}
7+
}

common/src/main/scala/common/util/StringUtil.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,5 +44,11 @@ object StringUtil {
4444
* e.g: /root/some/dir -> root/some/dir/
4545
*/
4646
def relativeDirectory = string.ensureNoLeadingSlash.ensureSlashed
47+
48+
def elided(limit: Int): String = {
49+
if (string.length > limit) {
50+
s"(elided) ${string.take(limit)}..."
51+
} else string
52+
}
4753
}
4854
}

core/src/main/scala/cromwell/util/JsonEditor.scala

Lines changed: 71 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package cromwell.util
22

33
import cats.data.NonEmptyList
44
import common.collections.EnhancedCollections._
5-
import io.circe.{Json, JsonNumber, JsonObject}
5+
import cromwell.core.WorkflowId
6+
import io.circe.{Json, JsonNumber, JsonObject, Printer}
67
import mouse.all._
78
import io.circe.Json.Folder
89

910
import scala.collection.immutable
11+
import common.util.StringUtil._
1012

1113
object JsonEditor {
1214

@@ -48,7 +50,7 @@ object JsonEditor {
4850
}
4951
}
5052
val jsonObject = Json.fromJsonObject(JsonObject.fromIterable(modified))
51-
val keep = modified.size > 0
53+
val keep = modified.nonEmpty
5254
(jsonObject, keep)
5355
}
5456
}
@@ -72,21 +74,77 @@ object JsonEditor {
7274

7375
def logs(json: Json): Json = includeJson(json, NonEmptyList.of("stdout", "stderr", "backendLogs", "id"))
7476

77+
implicit class EnhancedJson(val json: Json) extends AnyVal {
78+
def rootWorkflowId: Option[WorkflowId] = {
79+
for {
80+
o <- json.asObject
81+
id <- o.kleisli("id")
82+
s <- id.asString
83+
} yield WorkflowId.fromString(s)
84+
}
85+
}
86+
7587
/**
76-
* We only return labels associated with top-level workflows. Subworkflows don't include labels (as of 7/26/19).
77-
*
78-
* Thus this method puts the labels as a top-level field.
88+
* In-memory upsert of labels into the base Json, handling root and sub workflows appropriately.
7989
*
8090
* @param json json blob with or without "labels" field
81-
* @param labels a map of labels one would like to apply to a workflow json
91+
* @param databaseLabels a map of workflow IDs to maps of labels one would like to apply to a workflow json
8292
* @return json with labels merged in. Any prior non-object "labels" field will be overwritten and any object fields will be merged together and - again - any existing values overwritten.
8393
*/
84-
def augmentLabels(json: Json, labels: Map[String, String]): Json = {
85-
val newData: Json = Json.fromFields(labels.safeMapValues(Json.fromString))
86-
val newObj: Json = Json.fromFields(List(("labels", newData)))
87-
//in the event of a key clash, the values in "newObj" will be favored over "json"
88-
json deepMerge newObj
89-
}
94+
def updateLabels(json: Json, databaseLabels: Map[WorkflowId, Map[String, String]]): Json = {
95+
val subWorkflowMetadataKey = "subWorkflowMetadata"
96+
97+
def doUpdateWorkflow(workflowJson: Json): Json = {
98+
val id: String = workflowJson.rootWorkflowId.
99+
getOrElse(throw new RuntimeException(s"did not find workflow id in ${workflowJson.printWith(Printer.spaces2).elided(100)}")).toString
100+
101+
// Look for an optional JsonObject keyed by "calls".
102+
val callsObject: Option[JsonObject] = for {
103+
wo <- workflowJson.asObject
104+
callsJson <- wo("calls")
105+
co <- callsJson.asObject
106+
} yield co
107+
108+
val workflowWithUpdatedCalls: Json = callsObject match {
109+
// If there were no calls just return the workflow JSON unmodified.
110+
case None => workflowJson
111+
case Some(calls) =>
112+
val updatedCallsObject = calls.mapValues {
113+
// The Json (a JSON array, really) corresponding to the array of call objects for a call name.
114+
callValue: Json =>
115+
// The object above converted to a List[Json].
116+
val callArray: Vector[Json] = callValue.asArray.toVector.flatten
90117

91-
def removeSubworkflowData(json: Json): Json = excludeJson(json, NonEmptyList.of("subWorkflowMetadata"))
118+
val updatedCallArray = callArray map { callJson =>
119+
// If there is no subworkflow object this will be None.
120+
val callAndSubworkflowObjects: Option[(JsonObject, Json)] = for {
121+
co <- callJson.asObject
122+
sub <- co(subWorkflowMetadataKey)
123+
_ <- sub.asObject
124+
} yield (co, sub)
125+
126+
callAndSubworkflowObjects match {
127+
case None => callJson
128+
case Some((callObject, subworkflowObject)) =>
129+
// If the call contains a subWorkflowMetadata key, return a copy of the call with
130+
// its subworkflowMetadata updated via a recursive call to `doUpdateWorkflow`.
131+
val updatedSubworkflow = doUpdateWorkflow(subworkflowObject)
132+
Json.fromJsonObject(callObject.add(subWorkflowMetadataKey, updatedSubworkflow))
133+
}
134+
}
135+
Json.fromValues(updatedCallArray)
136+
}
137+
Json.fromJsonObject(workflowJson.asObject.get.add("calls", Json.fromJsonObject(updatedCallsObject)))
138+
}
139+
140+
databaseLabels.get(WorkflowId.fromString(id)) match {
141+
case None => workflowWithUpdatedCalls
142+
case Some(labels) =>
143+
val labelsJson: Json = Json.fromFields(labels safeMapValues Json.fromString)
144+
workflowWithUpdatedCalls deepMerge Json.fromFields(List(("labels", labelsJson)))
145+
}
146+
}
147+
148+
doUpdateWorkflow(workflowJson = json)
149+
}
92150
}

0 commit comments

Comments
 (0)