11package org.ivdnt.galahad.jobs
22
33import org.ivdnt.galahad.annotations.SOURCE_LAYER_NAME
4+ import org.ivdnt.galahad.documents.Document
45import org.ivdnt.galahad.exceptions.SourceLayerNotATaggerException
5- import org.ivdnt.galahad.exceptions.TaggerNoConnectionException
66import org.ivdnt.galahad.formats.InternalFile
77import org.ivdnt.galahad.taggers.Tagger
88import org.springframework.core.io.FileSystemResource
99import org.springframework.http.HttpEntity
1010import org.springframework.http.HttpHeaders
11- import org.springframework.http.HttpMethod
1211import org.springframework.http.HttpStatus
1312import org.springframework.http.MediaType
1413import org.springframework.util.LinkedMultiValueMap
15- import org.springframework.util.MultiValueMap
16- import org.springframework.web.client.ResourceAccessException
1714import org.springframework.web.client.RestTemplate
1815import org.springframework.web.client.postForEntity
19- import org.springframework.web.util.UriComponentsBuilder
2016import java.io.File
21- import java.net.URI
2217import java.util.UUID
18+ import kotlin.collections.ArrayDeque
19+ import kotlin.collections.count
20+ import kotlin.collections.minusAssign
21+ import kotlin.collections.plusAssign
2322import kotlin.io.path.createTempFile
2423
2524object JobController {
26- private const val DOC_PARALLELIZATION_SIZE = 3
27-
28- val queue: ArrayDeque <Job > = ArrayDeque <Job >()
29- val tasks: MutableMap <UUID , Task > = mutableMapOf ()
30- var active: Boolean = false
31-
32- class Task (val job : Job , val document : String ) {
33- fun finish (file : File ) {
34- job.results.createOrThrow(document).layer = InternalFile .create(file).layer
35- }
36- }
25+ private val queue: ArrayDeque <Job > = ArrayDeque <Job >()
26+ private var task: Task ? = null
3727
3828 fun queue (job : Job ) {
3929 if (job.name == SOURCE_LAYER_NAME ) throw SourceLayerNotATaggerException ()
@@ -44,36 +34,63 @@ object JobController {
4434 fun unqueue (job : Job ) {
4535 if (job.name == SOURCE_LAYER_NAME ) throw SourceLayerNotATaggerException ()
4636 queue - = job
47- // TODO: stop
37+ stop(job)
38+ }
39+
40+ fun receive (uuid : UUID , file : File ) {
41+ if (task == null || uuid != task?.uuid) {
42+ throw Exception (" No task found for UUID $uuid " )
43+ }
44+ task!! .finish(file)
45+ // if no untagged documents are left, remove the job from the queue
46+ val numUntagged = task!! .job.corpus.documents.readAll().count { task!! .job.results.readOrNull(it.name) == null }
47+ if (numUntagged == 0 ) {
48+ queue - = task!! .job
49+ }
50+ // next document, or next job if all documents are tagged
51+ start()
4852 }
4953
50- fun start () {
51- if (! active) {
52- // first job if exists
54+ private fun start () {
55+ // Only one task at a time.
56+ if (task == null ) {
57+ // First job in queue if it exists.
5358 queue.firstOrNull()?.let { job ->
54- // get all docs each time, because new ones might be added while tagging
59+ // Get all docs each time, because new ones might be added while tagging.
5560 val docs = job.corpus.documents.readAllSequence()
56- // untagged are those for which exists no result
61+ // Untagged are those for which no result exists.
5762 val untagged = docs.filter { job.results.readOrNull(it.name) == null }
58- // create a task for the first one
59- val task = Task (job, untagged.first().name)
60- val id = tag(task)
61- tasks[id] = task
63+ // Is there even an untagged document?
64+ val doc = untagged.firstOrNull()
65+ if (doc == null ) {
66+ // Somehow no untagged documents left, remove the job from the queue anyway.
67+ queue - = job
68+ } else {
69+ // Tag and register task.
70+ val id = tag(job, doc)
71+ task = Task (id, job, doc.name)
72+ }
6273 }
6374 }
6475 }
6576
66- fun tag (task : Task ): UUID {
67- active = true
68- val url = " ${Tagger .readOrThrow(task.job.name).url} /input"
69- val text = task.job.corpus.jobs.readOrThrow(SOURCE_LAYER_NAME ).results.readOrThrow(task.document).layer.toString()
77+ private fun stop (job : Job ) {
78+ if (task?.job != job) {
79+ return // No task to stop.
80+ }
81+ // Send stop signal to tagger.
82+ val url = " ${Tagger .readOrThrow(job.name).url} /input/${task?.uuid} "
83+ RestTemplate ().delete(url)
84+ task = null
85+ }
86+
87+ private fun tag (job : Job , doc : Document ): UUID {
88+ val url = " ${Tagger .readOrThrow(job.name).url} /input"
89+ val text = job.corpus.jobs.readOrThrow(SOURCE_LAYER_NAME ).results.readOrThrow(doc.name).layer.toString()
7090 val file = createTempFile().toFile().also { it.writeText(text) }
71- val entity = HttpEntity (
72- LinkedMultiValueMap <String , Any >().apply {
73- add(" file" , FileSystemResource (file))
74- },
75- HttpHeaders ().apply { contentType = MediaType .MULTIPART_FORM_DATA }
76- )
91+ val entity = HttpEntity (LinkedMultiValueMap <String , Any >().apply {
92+ add(" file" , FileSystemResource (file))
93+ }, HttpHeaders ().apply { contentType = MediaType .MULTIPART_FORM_DATA })
7794 val response = RestTemplate ().postForEntity<String >(
7895 url, entity
7996 )
@@ -83,18 +100,9 @@ object JobController {
83100 return UUID .fromString(response.body) ? : throw Exception (" No UUID received from tagger" )
84101 }
85102
86- fun receive (uuid : UUID , file : File ) {
87- val task = tasks[uuid] ? : throw Exception (" No task found for UUID $uuid " )
88- task.finish(file)
89- active = false
90- tasks.remove(uuid)
91- // if no untagged documents are left, remove the job from the queue
92- val numUntagged = task.job.corpus.documents.readAll()
93- .count { task.job.results.readOrNull(it.name) == null }
94- if (numUntagged == 0 ) {
95- queue - = task.job
103+ private class Task (val uuid : UUID , val job : Job , val doc : String ) {
104+ fun finish (file : File ) {
105+ job.results.createOrThrow(doc).layer = InternalFile .create(file).layer
96106 }
97- // next document, or next job if all documents are tagged
98- start()
99107 }
100108}
0 commit comments