-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathVerificationServer.scala
More file actions
326 lines (291 loc) · 13.3 KB
/
Copy pathVerificationServer.scala
File metadata and controls
326 lines (291 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2011-2020 ETH Zurich.
package viper.server.vsi
import akka.Done
import akka.actor.{ActorRef, ActorSystem, PoisonPill, Status}
import akka.pattern.ask
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.OverflowStrategy
import akka.util.Timeout
import viper.server.core.VerificationExecutionContext
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.util.{Failure, Success}
import scala.language.postfixOps
abstract class VerificationServerException extends Exception
case object JobNotFoundException extends VerificationServerException
abstract class AstConstructionException extends VerificationServerException
/** This trait provides state and process management functionality for verification servers.
*
* The server runs on Akka's actor system. This means that the entire server's state
* and process management are run by actors. The 3 actors in charge are:
*
* 1) Job Actor
* 2) Queue Actor
* 3) Terminator Actor
*
* The first two actors manage individual verification processes. I.e., on
* initializeVerificationProcess() and instance of each actor is created. The JobActor launches
* the actual VerificationTask, while the QueueActor acts as a middleman for communication
* between a VerificationTask's backend and the server. The Terminator Actor is in charge of
* terminating both individual processes and the server.
*/
trait VerificationServer extends Post {
type AST
implicit val executor: VerificationExecutionContext
implicit val system: ActorSystem = executor.actorSystem
implicit def askTimeout: Timeout
protected var _termActor: ActorRef = _
implicit val ast_id_fact: Int => AstJobId = AstJobId.apply
implicit val ver_id_fact: Int => VerJobId = VerJobId.apply
protected var ast_jobs: JobPool[AstJobId, AstHandle[Option[AST]]] = _
protected var ver_jobs: JobPool[VerJobId, VerHandle] = _
var isRunning: Boolean = false
/** Configures an instance of VerificationServer.
*
* This function must be called before any other. Calling any other function before this one
* will result in an IllegalStateException.
* The returned future resolves when the server has been started.
*
* Note that a default implementation is provided in DefaultVerificationServerStart
*/
def start(active_jobs: Int): Future[Done]
protected def initializeProcess[S <: JobId, T <: JobHandle : ClassTag]
(pool: JobPool[S, T],
task_maybe_fut: Future[Option[MessageStreamingTask[_]]],
discardOnCompletion: Boolean,
prev_job_id_maybe: Option[AstJobId] = None): S = {
if (!isRunning) {
throw new IllegalStateException("Instance of VerificationServer already stopped")
}
require(pool.newJobsAllowed)
/** Ask the pool to book a new job using the above function
* to construct Future[JobHandle] and Promise[AST] later on. */
pool.bookNewJob((new_jid: S) => task_maybe_fut.flatMap((task_maybe: Option[MessageStreamingTask[_]]) => {
task_maybe match {
case None =>
/** If there's no task, that means their prerequisite tasks haven't produced usable artifacts.
* The sole purpose of this task is hence to hold the identifier of its predecessor.
* We should remove this task from the job pool. */
pool.discardJob(new_jid)
new_jid match {
case _: VerJobId =>
Future.successful(VerHandle(null, null, null, prev_job_id_maybe))
}
case Some(task) =>
/** What we really want here is SourceQueueWithComplete[Envelope]
* Publisher[Envelope] might be needed to create a stream later on,
* but the publisher and the queue are synchronized are should be viewed
* as different representation of the same concept.
*/
val (queue, publisher) = Source.queue[Envelope](10000, OverflowStrategy.backpressure)
.toMat(Sink.asPublisher(false))(Keep.both).run()
/** This actor will be responsible for managing ONE queue,
* whereas the JobActor can manage multiple tasks, all of which are related to some pipeline,
* e.g. [Text] ---> [AST] ---> [VerificationResult]
* '--- Task I ----' |
* '---------- Task II ----------'
**/
val message_actor = system.actorOf(QueueActor.props(queue), s"${pool.tag}--message_actor--${new_jid.id}")
task.setQueueActor(message_actor)
val job_actor = system.actorOf(JobActor.props(new_jid), s"${pool.tag}_job_actor_${new_jid}")
/** Register cleanup task. */
queue.watchCompletion().onComplete(_ => {
if (discardOnCompletion) {
pool.discardJob(new_jid)
/** FIXME: if the job actors are meant to be reused from one phase to another (only partially implemented),
* FIXME: then they should be stopped only after the **last** job is completed in the pipeline. */
job_actor ! PoisonPill
}
})
(job_actor ? (new_jid match {
case _: AstJobId =>
VerificationProtocol.ConstructAst(task, queue, publisher, executor)
case _: VerJobId =>
VerificationProtocol.Verify(task, queue, publisher,
/** TODO: Use factories for specializing the messages.
* TODO: Clearly, there should be a clean separation between concrete job types
* TODO: (AST Construction, Verification) and generic types (JobHandle). */
prev_job_id_maybe match {
case Some(prev_job_id: AstJobId) =>
Some(prev_job_id)
case Some(prev_job_id) =>
throw new IllegalArgumentException(s"cannot map ${prev_job_id.toString} to expected type AstJobId")
case None =>
None
}, executor)
})).mapTo[T]
}
/** TODO avoid hardcoded parameters */
}).recover({
case e: AstConstructionException =>
// If the AST construction phase failed, remove the verification job handle
// from the corresponding pool.
val msg = s"AST construction job ${prev_job_id_maybe.get} resulted in a failure: $e"
println(msg)
pool.discardJob(new_jid)
}).mapTo[T])
}
protected def initializeAstConstruction(task: MessageStreamingTask[Option[AST]]): AstJobId = {
if (!isRunning) {
throw new IllegalStateException("Instance of VerificationServer already stopped")
}
if (ast_jobs.newJobsAllowed) {
initializeProcess(ast_jobs, Future.successful(Some(task)), false)
} else {
AstJobId(-1) // Process Management running at max capacity.
}
}
protected def discardAstOnCompletion(jid: AstJobId, jobActor: ActorRef) = {
ast_jobs.lookupJob(jid).map(_.map(_.queue.watchCompletion().onComplete(_ => {
ast_jobs.discardJob(jid)
jobActor ! PoisonPill
})))
}
/** This method starts a verification process.
*
* As such, it accepts an instance of a VerificationTask, which it will pass to the JobActor.
*/
protected def initializeVerificationProcess(task_maybe_fut: Future[Option[MessageStreamingTask[Unit]]],
ast_job_id_maybe: Option[AstJobId]): VerJobId = {
if (!isRunning) {
throw new IllegalStateException("Instance of VerificationServer already stopped")
}
if (ver_jobs.newJobsAllowed) {
initializeProcess(ver_jobs, task_maybe_fut, true, ast_job_id_maybe)
} else {
VerJobId(-1) // Process Management running at max capacity.
}
}
/** Stream all messages generated by the backend to some actor.
*
* Deletes the JobHandle on completion.
*/
protected def streamMessages(jid: VerJobId, clientActor: ActorRef, full: Boolean): Option[Future[Done]] = {
if (!isRunning) {
throw new IllegalStateException("Instance of VerificationServer already stopped")
}
ver_jobs.lookupJob(jid) match {
case None =>
/** Verification job not found */
None
case Some(handle_future) =>
Some(handle_future.flatMap((ver_handle: VerHandle) => {
ver_handle.prev_job_id match {
case _ if !full =>
/** The AST for this verification job is already being streamed. */
Future.successful((None, ver_handle))
case None =>
/** The AST for this verification job wasn't created by this server. */
Future.successful((None, ver_handle))
case Some(ast_id) =>
/** The AST construction job may have been cleaned up
* (if all of its messages were already consumed) */
ast_jobs.lookupJob(ast_id) match {
case Some(ast_handle_fut) =>
ast_handle_fut.map(ast_handle => (Some(ast_handle), ver_handle))
case None =>
Future.successful((None, ver_handle))
}
}
}) flatMap {
case (ast_handle_maybe: Option[AstHandle[Option[AST]]], ver_handle: VerHandle) =>
val ver_source = ver_handle match {
case VerHandle(null, null, null, _) =>
/** There were no messages produced during verification. */
Source.empty[Envelope]
case _ =>
Source.fromPublisher(ver_handle.publisher)
}
val ast_source = ast_handle_maybe match {
case None =>
/** The AST messages were already consumed. */
Source.empty[Envelope]
case Some(ast_handle) =>
Source.fromPublisher(ast_handle.publisher)
}
if (full) {
val resulting_source = ver_source.prepend(ast_source).map(e => unpack(e))
resulting_source.runWith(Sink.actorRef(clientActor, Status.Success, Status.Failure))
} else {
val sink = Sink.actorRef(clientActor, Status.Success, Status.Failure)
val resulting_source = ver_source.map(e => unpack(e))
resulting_source.runWith(sink)
}
// FIXME This assumes that someone will actually complete the verification job queue.
// FIXME Could we guarantee that the client won't forget to do this?
ver_handle.queue.watchCompletion()
})
}
}
/** Stream all messages generated by the backend to some actor.
*
* Deletes the JobHandle on completion.
*/
protected def streamMessages(jid: AstJobId, clientActor: ActorRef): Option[Future[Done]] = {
if (!isRunning) {
throw new IllegalStateException("Instance of VerificationServer already stopped")
}
ast_jobs.lookupJob(jid).map(ast_handle_fut => {
ast_handle_fut.map(ast_handle => (Source.fromPublisher(ast_handle.publisher), ast_handle)).flatMap {
case (ast_handle_maybe, ast_handle) => {
val ast_source = ast_handle_maybe.map(e => unpack(e))
ast_source.runWith(Sink.actorRef(clientActor, Status.Success, Status.Failure))
ast_handle.queue.watchCompletion()
}
}
})
}
/** Stops an instance of VerificationServer from running.
* The actor system and executor do not get terminated and are the responsibility of the caller
*
* As such it should be the last method called. Calling any other function after stop will
* result in an IllegalStateException.
* */
def stop(): Future[List[String]] = {
if(!isRunning) {
throw new IllegalStateException("Instance of VerificationServer already stopped")
}
isRunning = false
getInterruptFutureList().transform(r => {
_termActor ! Terminator.Exit
r match {
case Success(_) => println(s"shutting down...")
case Failure(_) => println(s"forcibly shutting down...")
}
// delete termActor since we no longer need it. Otherwise, start() cannot be called
_termActor = null
r
})
}
/** This method interrupts active jobs upon termination of the server.
*/
protected def getInterruptFutureList(): Future[List[String]] = {
implicit val askTimeout: Timeout = Timeout(1000 milliseconds)
val handles = ver_jobs.jobHandles ++ ast_jobs.jobHandles
val interrupt_future_list: List[Future[String]] = handles map {
case (_, handle_future) =>
handle_future.flatMap {
case AstHandle(actor, _, _, _) =>
(actor ? VerificationProtocol.StopAstConstruction).mapTo[String]
case VerHandle(actor, _, _, _) =>
(actor ? VerificationProtocol.StopVerification).mapTo[String]
}
} toList
val overall_interrupt_future: Future[List[String]] = Future.sequence(interrupt_future_list)
overall_interrupt_future
}
}
trait DefaultVerificationServerStart extends VerificationServer {
override def start(active_jobs: Int): Future[Done] = {
ast_jobs = new JobPool("VSI-AST-pool", active_jobs)
ver_jobs = new JobPool("VSI-Verification-pool", active_jobs)
_termActor = system.actorOf(Terminator.props(ast_jobs, ver_jobs), Terminator.GetNextTerminatorName)
isRunning = true
Future.successful(Done)
}
}