11package cromwell .services .metadata .hybridcarbonite
22
33import akka .actor .{ActorRef , FSM , LoggingFSM , Props }
4- import cromwell .services .metadata .MetadataService .{GetLabels , GetRootAndSubworkflowLabels , GetStatus , MetadataReadAction , MetadataServiceResponse , QueryForWorkflowsMatchingParameters , WorkflowMetadataReadAction , WorkflowQueryFailure , WorkflowQuerySuccess }
4+ import cromwell .services .metadata .MetadataService .{MetadataReadAction , MetadataServiceResponse , QueryForWorkflowsMatchingParameters , WorkflowMetadataReadAction , WorkflowQueryFailure , WorkflowQuerySuccess }
55import cromwell .services .metadata .hybridcarbonite .HybridReadDeciderActor ._
66import cromwell .services .metadata .impl .builder .MetadataBuilderActor .FailedMetadataResponse
77import cromwell .services .metadata .{MetadataArchiveStatus , WorkflowQueryKey }
@@ -15,27 +15,33 @@ class HybridReadDeciderActor(classicMetadataServiceActor: ActorRef, carboniteMet
1515 implicit val ec : ExecutionContext = context.dispatcher
1616
1717 when(Pending ) {
18- case Event (e @ (_ : GetLabels | _ : GetRootAndSubworkflowLabels | _ : GetStatus | _ : QueryForWorkflowsMatchingParameters ), NoData ) =>
19- // All of these messages should go to the classic metadata service exclusively, no carbonited data required.
20- classicMetadataServiceActor forward e
21- stop(FSM .Normal )
22-
23- case Event (other : WorkflowMetadataReadAction , NoData ) => // GetLogs, WorkflowOutputs, GetMetadataAction
24- classicMetadataServiceActor ! QueryForWorkflowsMatchingParameters (Vector (WorkflowQueryKey .Id .name -> other.workflowId.toString))
25- goto(RequestingMetadataArchiveStatus ) using WorkingData (sender(), other)
18+ case Event (read : MetadataReadAction , NoData ) =>
19+ read match {
20+ case wmra : WorkflowMetadataReadAction =>
21+ classicMetadataServiceActor ! QueryForWorkflowsMatchingParameters (Vector (WorkflowQueryKey .Id .name -> wmra.workflowId.toString))
22+ goto(RequestingMetadataArchiveStatus ) using WorkingData (sender(), read)
23+
24+ case query : QueryForWorkflowsMatchingParameters =>
25+ classicMetadataServiceActor ! query
26+ goto(WaitingForMetadataResponse ) using WorkingData (sender(), read)
27+ }
2628 }
2729
2830 when(RequestingMetadataArchiveStatus ) {
29- case Event (s : WorkflowQuerySuccess , wd : WorkingData ) if s.hasMultipleStatusRows =>
30- val errorMsg = s " Programmer Error: Got more than one status row back looking up metadata archive status for ${wd.request}: ${s.response}"
31- wd.actor ! makeAppropriateFailureForRequest(errorMsg, wd.request)
32- stop(FSM .Failure (errorMsg))
33- case Event (s : WorkflowQuerySuccess , wd : WorkingData ) if s.isCarbonited =>
34- carboniteMetadataServiceActor ! wd.request
35- goto(WaitingForMetadataResponse )
36- case Event (_ : WorkflowQuerySuccess , wd : WorkingData ) =>
37- classicMetadataServiceActor ! wd.request
38- goto(WaitingForMetadataResponse )
31+ case Event (WorkflowQuerySuccess (response, _), wd : WorkingData ) =>
32+ if (response.results.size > 1 ) {
33+ val errorMsg = s " Programmer Error: Got more than one status row back looking up metadata archive status for ${wd.request}: $response"
34+ wd.actor ! makeAppropriateFailureForRequest(errorMsg, wd.request)
35+ stop(FSM .Failure (errorMsg))
36+ } else if (response.results.headOption.exists(_.metadataArchiveStatus == MetadataArchiveStatus .Archived )) {
37+ carboniteMetadataServiceActor ! wd.request
38+ goto(WaitingForMetadataResponse )
39+ } else {
40+ classicMetadataServiceActor ! wd.request
41+ goto(WaitingForMetadataResponse )
42+ }
43+
44+
3945 case Event (WorkflowQueryFailure (reason), wd : WorkingData ) =>
4046 log.error(reason, s " Programmer Error: Failed to determine how to route ${wd.request.getClass.getSimpleName}. Falling back to classic. " )
4147 classicMetadataServiceActor ! wd.request
@@ -80,10 +86,4 @@ object HybridReadDeciderActor {
8086 sealed trait HybridReadDeciderData
8187 case object NoData extends HybridReadDeciderData
8288 final case class WorkingData (actor : ActorRef , request : MetadataReadAction ) extends HybridReadDeciderData
83-
84- implicit class EnhancedWorkflowQuerySuccess (val success : WorkflowQuerySuccess ) extends AnyVal {
85- def hasMultipleStatusRows : Boolean = success.response.results.size > 1
86-
87- def isCarbonited : Boolean = success.response.results.headOption.exists(_.metadataArchiveStatus == MetadataArchiveStatus .Archived )
88- }
8989}
0 commit comments