Skip to content

Commit 014d31e

Browse files
cwperkscliu123
andauthored
Replace search sorted by _id to _seq_no for query getting job metadata on sweep (#896)
* Replace search sorted by _id to _seq_no for query getting job metadata on sweep Signed-off-by: Craig Perkins <cwperx@amazon.com> Co-authored-by: cliu123 <lc12251109@gmail.com>
1 parent 7c9d58f commit 014d31e

File tree

2 files changed

+128
-14
lines changed

2 files changed

+128
-14
lines changed

src/main/java/org/opensearch/jobscheduler/sweeper/JobSweeper.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,8 @@ private void sweepAllJobIndices() {
331331
this.lastFullSweepTimeNano = System.nanoTime();
332332
}
333333

334-
private void sweepIndex(String indexName) {
334+
@VisibleForTesting
335+
void sweepIndex(String indexName) {
335336
ClusterState clusterState = this.clusterService.state();
336337
// checks to see if index no longer exists
337338
if (!clusterState.routingTable().hasIndex(indexName)) {
@@ -368,14 +369,14 @@ private void sweepIndex(String indexName) {
368369
try {
369370
List<ShardRouting> shardRoutingList = shard.getValue();
370371
List<String> shardNodeIds = shardRoutingList.stream().map(ShardRouting::currentNodeId).collect(Collectors.toList());
371-
sweepShard(shard.getKey(), new ShardNodes(localNodeId, shardNodeIds), null);
372+
sweepShard(shard.getKey(), new ShardNodes(localNodeId, shardNodeIds), -1L);
372373
} catch (Exception e) {
373374
log.info("Error while sweeping shard {}, error message: {}", shard.getKey(), e.getMessage());
374375
}
375376
}
376377
}
377378

378-
private void sweepShard(ShardId shardId, ShardNodes shardNodes, String startAfter) {
379+
private void sweepShard(ShardId shardId, ShardNodes shardNodes, long startAfter) {
379380
ConcurrentHashMap<String, JobDocVersion> currentJobs = this.sweptJobs.containsKey(shardId)
380381
? this.sweptJobs.get(shardId)
381382
: new ConcurrentHashMap<>();
@@ -388,24 +389,27 @@ private void sweepShard(ShardId shardId, ShardNodes shardNodes, String startAfte
388389
}
389390
}
390391

391-
String searchAfter = startAfter == null ? "" : startAfter;
392-
while (searchAfter != null) {
392+
long searchAfter = startAfter;
393+
while (searchAfter >= -1L) {
393394
SearchRequest jobSearchRequest = new SearchRequest().indices(shardId.getIndexName())
394395
.preference("_shards:" + shardId.id() + "|_primary")
395396
.source(
396397
new SearchSourceBuilder().version(true)
397398
.seqNoAndPrimaryTerm(true)
398-
.sort(new FieldSortBuilder("_id").unmappedType("keyword").missing("_last"))
399-
.searchAfter(new String[] { searchAfter })
399+
.sort(new FieldSortBuilder("_seq_no").unmappedType("long"))
400+
.searchAfter(new Long[] { searchAfter })
400401
.size(this.sweepPageMaxSize)
401402
.query(QueryBuilders.matchAllQuery())
402403
);
403404

404-
SearchResponse response = this.retry(
405-
(searchRequest) -> this.client.search(searchRequest),
406-
jobSearchRequest,
407-
this.sweepSearchBackoff
408-
).actionGet(this.sweepSearchTimeout);
405+
SearchResponse response;
406+
try {
407+
response = this.retry((searchRequest) -> this.client.search(searchRequest), jobSearchRequest, this.sweepSearchBackoff)
408+
.actionGet(this.sweepSearchTimeout);
409+
} catch (Exception e) {
410+
log.error("Aborting sweep of shard {}, will retry on next sweep cycle.", shardId, e);
411+
return;
412+
}
409413
if (response.status() != RestStatus.OK) {
410414
log.error("Error sweeping shard {}, failed querying jobs on this shard", shardId);
411415
return;
@@ -422,10 +426,10 @@ private void sweepShard(ShardId shardId, ShardNodes shardNodes, String startAfte
422426
}
423427
}
424428
if (response.getHits() == null || response.getHits().getHits().length < 1) {
425-
searchAfter = null;
429+
break;
426430
} else {
427431
SearchHit lastHit = response.getHits().getHits()[response.getHits().getHits().length - 1];
428-
searchAfter = lastHit.getId();
432+
searchAfter = lastHit.getSeqNo();
429433
}
430434
}
431435
}

src/test/java/org/opensearch/jobscheduler/sweeper/JobSweeperTests.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.lucene.util.BytesRef;
2222
import org.opensearch.Version;
2323
import org.opensearch.action.delete.DeleteResponse;
24+
import org.opensearch.action.search.SearchResponse;
2425
import org.opensearch.cluster.ClusterName;
2526
import org.opensearch.cluster.ClusterState;
2627
import org.opensearch.cluster.OpenSearchAllocationTestCase;
@@ -38,13 +39,17 @@
3839
import org.opensearch.common.action.ActionFuture;
3940
import org.opensearch.common.settings.ClusterSettings;
4041
import org.opensearch.common.settings.Setting;
42+
import org.opensearch.common.unit.TimeValue;
4143
import org.opensearch.common.settings.Settings;
4244
import org.opensearch.core.xcontent.NamedXContentRegistry;
4345
import org.opensearch.core.index.Index;
4446
import org.opensearch.index.engine.Engine;
4547
import org.opensearch.index.mapper.ParseContext;
4648
import org.opensearch.index.mapper.ParsedDocument;
4749
import org.opensearch.core.index.shard.ShardId;
50+
import org.opensearch.core.rest.RestStatus;
51+
import org.opensearch.search.SearchHit;
52+
import org.opensearch.search.SearchHits;
4853
import org.opensearch.test.ClusterServiceUtils;
4954
import org.opensearch.test.OpenSearchTestCase;
5055
import org.opensearch.threadpool.Scheduler;
@@ -275,6 +280,111 @@ public void testSweep() throws IOException {
275280
);
276281
}
277282

283+
public void testSweepUsesSeqNoSort() throws IOException {
284+
SearchHit hit = new SearchHit(1, "doc-id", null, null);
285+
hit.sourceRef(this.getTestJsonSource());
286+
hit.setSeqNo(42L);
287+
hit.setPrimaryTerm(1L);
288+
SearchHits hits = new SearchHits(new SearchHit[] { hit }, null, 1.0f);
289+
290+
SearchHits emptyHits = new SearchHits(new SearchHit[0], null, 1.0f);
291+
292+
SearchResponse firstResponse = Mockito.mock(SearchResponse.class);
293+
Mockito.when(firstResponse.status()).thenReturn(RestStatus.OK);
294+
Mockito.when(firstResponse.getHits()).thenReturn(hits);
295+
296+
SearchResponse secondResponse = Mockito.mock(SearchResponse.class);
297+
Mockito.when(secondResponse.status()).thenReturn(RestStatus.OK);
298+
Mockito.when(secondResponse.getHits()).thenReturn(emptyHits);
299+
300+
ActionFuture<SearchResponse> firstFuture = Mockito.mock(ActionFuture.class);
301+
Mockito.when(firstFuture.actionGet(Mockito.any(TimeValue.class))).thenReturn(firstResponse);
302+
303+
ActionFuture<SearchResponse> secondFuture = Mockito.mock(ActionFuture.class);
304+
Mockito.when(secondFuture.actionGet(Mockito.any(TimeValue.class))).thenReturn(secondResponse);
305+
306+
Mockito.when(this.client.search(Mockito.any())).thenReturn(firstFuture).thenReturn(secondFuture);
307+
308+
JobSweeper testSweeper = Mockito.spy(this.sweeper);
309+
Mockito.doNothing()
310+
.when(testSweeper)
311+
.sweep(Mockito.any(), Mockito.anyString(), Mockito.any(BytesReference.class), Mockito.any(JobDocVersion.class));
312+
313+
ClusterState clusterState = buildSingleShardClusterState("index-name");
314+
Mockito.when(this.clusterService.state()).thenReturn(clusterState);
315+
316+
testSweeper.sweepIndex("index-name");
317+
318+
// verify search was called twice: once for the page with the hit, once for the empty page
319+
Mockito.verify(this.client, Mockito.times(2)).search(Mockito.any());
320+
}
321+
322+
public void testSweepAbortsOnNonOkResponse() {
323+
SearchResponse badResponse = Mockito.mock(SearchResponse.class);
324+
Mockito.when(badResponse.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR);
325+
326+
ActionFuture<SearchResponse> future = Mockito.mock(ActionFuture.class);
327+
Mockito.when(future.actionGet(Mockito.any(TimeValue.class))).thenReturn(badResponse);
328+
Mockito.when(this.client.search(Mockito.any())).thenReturn(future);
329+
330+
JobSweeper testSweeper = Mockito.spy(this.sweeper);
331+
Mockito.doNothing()
332+
.when(testSweeper)
333+
.sweep(Mockito.any(), Mockito.anyString(), Mockito.any(BytesReference.class), Mockito.any(JobDocVersion.class));
334+
335+
ClusterState clusterState = buildSingleShardClusterState("index-name");
336+
Mockito.when(this.clusterService.state()).thenReturn(clusterState);
337+
338+
testSweeper.sweepIndex("index-name");
339+
340+
// search was called once, but sweep was never called due to non-OK status
341+
Mockito.verify(this.client, Mockito.times(1)).search(Mockito.any());
342+
Mockito.verify(testSweeper, Mockito.times(0))
343+
.sweep(Mockito.any(), Mockito.anyString(), Mockito.any(BytesReference.class), Mockito.any(JobDocVersion.class));
344+
}
345+
346+
public void testSweepAbortsOnSearchException() {
347+
ActionFuture<SearchResponse> failingFuture = Mockito.mock(ActionFuture.class);
348+
Mockito.when(failingFuture.actionGet(Mockito.any(TimeValue.class)))
349+
.thenThrow(new RuntimeException("fielddata access on _id disallowed"));
350+
Mockito.when(this.client.search(Mockito.any())).thenReturn(failingFuture);
351+
352+
JobSweeper testSweeper = Mockito.spy(this.sweeper);
353+
Mockito.doNothing()
354+
.when(testSweeper)
355+
.sweep(Mockito.any(), Mockito.anyString(), Mockito.any(BytesReference.class), Mockito.any(JobDocVersion.class));
356+
357+
ClusterState clusterState = buildSingleShardClusterState("index-name");
358+
Mockito.when(this.clusterService.state()).thenReturn(clusterState);
359+
360+
// should not throw — exception is caught and logged
361+
testSweeper.sweepIndex("index-name");
362+
363+
// search was attempted once before the exception aborted the loop
364+
Mockito.verify(this.client, Mockito.times(1)).search(Mockito.any());
365+
Mockito.verify(testSweeper, Mockito.times(0))
366+
.sweep(Mockito.any(), Mockito.anyString(), Mockito.any(BytesReference.class), Mockito.any(JobDocVersion.class));
367+
}
368+
369+
private ClusterState buildSingleShardClusterState(String indexName) {
370+
Metadata metadata = Metadata.builder().put(createIndexMetadata(indexName, 0, 1)).build();
371+
RoutingTable routingTable = new RoutingTable.Builder().add(
372+
new IndexRoutingTable.Builder(metadata.index(indexName).getIndex()).initializeAsNew(metadata.index(indexName)).build()
373+
).build();
374+
ClusterState clusterState = ClusterState.builder(new ClusterName("cluster-name"))
375+
.metadata(metadata)
376+
.routingTable(routingTable)
377+
.build();
378+
clusterState = this.addNodesToCluter(clusterState, 1);
379+
clusterState = this.initializeAllShards(clusterState);
380+
// set local node so getLocalShards can match shards assigned to this node
381+
String firstNodeId = clusterState.getNodes().iterator().next().getId();
382+
clusterState = ClusterState.builder(clusterState)
383+
.nodes(DiscoveryNodes.builder(clusterState.getNodes()).localNodeId(firstNodeId))
384+
.build();
385+
return clusterState;
386+
}
387+
278388
private ClusterState addNodesToCluter(ClusterState clusterState, int nodeCount) {
279389
DiscoveryNodes.Builder nodeBuilder = DiscoveryNodes.builder();
280390
for (int i = 1; i <= nodeCount; i++) {

0 commit comments

Comments
 (0)