Skip to content

Commit b5b45bc

Browse files
authored
Confluece Search API timezone fix (opensearch-project#5500)
* Adding crawler logic to apply pollingTimezoneOffset in seconds Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
1 parent 6e6ab6e commit b5b45bc

File tree

13 files changed

+257
-43
lines changed

13 files changed

+257
-43
lines changed

data-prepper-plugins/saas-source-plugins/confluence-source/src/main/java/org/opensearch/dataprepper/plugins/source/confluence/ConfluenceService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.opensearch.dataprepper.plugins.source.confluence.models.ConfluenceItem;
1818
import org.opensearch.dataprepper.plugins.source.confluence.models.ConfluencePaginationLinks;
1919
import org.opensearch.dataprepper.plugins.source.confluence.models.ConfluenceSearchResults;
20+
import org.opensearch.dataprepper.plugins.source.confluence.models.ConfluenceServerMetadata;
2021
import org.opensearch.dataprepper.plugins.source.confluence.rest.ConfluenceRestClient;
2122
import org.opensearch.dataprepper.plugins.source.confluence.utils.ConfluenceConfigHelper;
2223
import org.opensearch.dataprepper.plugins.source.confluence.utils.ConfluenceContentType;
@@ -90,6 +91,10 @@ public String getContent(String contentId) {
9091
return confluenceRestClient.getContent(contentId);
9192
}
9293

94+
public ConfluenceServerMetadata getConfluenceServerMetadata() {
95+
return confluenceRestClient.getConfluenceServerMetadata();
96+
}
97+
9398
/**
9499
* Method for building Content Item Info.
95100
*

data-prepper-plugins/saas-source-plugins/confluence-source/src/main/java/org/opensearch/dataprepper/plugins/source/confluence/ConfluenceSource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public class ConfluenceSource extends CrawlerSourcePlugin {
4747
private static final Logger log = LoggerFactory.getLogger(ConfluenceSource.class);
4848
private final ConfluenceSourceConfig confluenceSourceConfig;
4949
private final AtlassianAuthConfig jiraOauthConfig;
50+
private final ConfluenceService service;
5051

5152
@DataPrepperPluginConstructor
5253
public ConfluenceSource(final PluginMetrics pluginMetrics,
@@ -55,18 +56,21 @@ public ConfluenceSource(final PluginMetrics pluginMetrics,
5556
final PluginFactory pluginFactory,
5657
final AcknowledgementSetManager acknowledgementSetManager,
5758
Crawler crawler,
58-
PluginExecutorServiceProvider executorServiceProvider) {
59+
PluginExecutorServiceProvider executorServiceProvider,
60+
final ConfluenceService service) {
5961
super(PLUGIN_NAME, pluginMetrics, confluenceSourceConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
6062
log.info("Creating Confluence Source Plugin");
6163
this.confluenceSourceConfig = confluenceSourceConfig;
6264
this.jiraOauthConfig = jiraOauthConfig;
65+
this.service = service;
6366
}
6467

6568
@Override
6669
public void start(Buffer<Record<Event>> buffer) {
6770
log.info("Starting Confluence Source Plugin... ");
6871
ConfluenceConfigHelper.validateConfig(confluenceSourceConfig);
6972
jiraOauthConfig.initCredentials();
73+
super.setServerMetadata(service.getConfluenceServerMetadata());
7074
super.start(buffer);
7175
}
7276

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* The OpenSearch Contributors require contributions made to
6+
* this file be licensed under the Apache-2.0 license or a
7+
* compatible open source license.
8+
*
9+
*/
10+
11+
12+
package org.opensearch.dataprepper.plugins.source.confluence.models;
13+
14+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
15+
import com.fasterxml.jackson.annotation.JsonProperty;
16+
import lombok.Getter;
17+
import org.opensearch.dataprepper.plugins.source.confluence.utils.TimezoneHelper;
18+
import org.opensearch.dataprepper.plugins.source.source_crawler.base.SourceServerMetadata;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
import java.time.Duration;
23+
import java.time.ZoneId;
24+
25+
/**
26+
* The result of a SystemInfo API call.
27+
*/
28+
@Getter
29+
@JsonIgnoreProperties(ignoreUnknown = true)
30+
public class ConfluenceServerMetadata implements SourceServerMetadata {
31+
32+
Logger log = LoggerFactory.getLogger(ConfluenceServerMetadata.class);
33+
34+
@JsonProperty("cloudId")
35+
private String cloudId = null;
36+
37+
@JsonProperty("defaultTimeZone")
38+
private ZoneId defaultTimeZone = ZoneId.of("UTC");
39+
40+
@Override
41+
public Duration getPollingTimezoneOffset() {
42+
Duration pollingTimezoneOffset = TimezoneHelper.getUTCTimezoneOffset(defaultTimeZone);
43+
log.info("Confluence server default timezone: {} with pollingTimezoneOffset: {}",
44+
defaultTimeZone, pollingTimezoneOffset);
45+
return pollingTimezoneOffset;
46+
}
47+
48+
}

data-prepper-plugins/saas-source-plugins/confluence-source/src/main/java/org/opensearch/dataprepper/plugins/source/confluence/rest/ConfluenceRestClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.dataprepper.plugins.source.atlassian.rest.auth.AtlassianAuthConfig;
2020
import org.opensearch.dataprepper.plugins.source.confluence.models.ConfluencePaginationLinks;
2121
import org.opensearch.dataprepper.plugins.source.confluence.models.ConfluenceSearchResults;
22+
import org.opensearch.dataprepper.plugins.source.confluence.models.ConfluenceServerMetadata;
2223
import org.springframework.web.client.RestTemplate;
2324
import org.springframework.web.util.UriComponentsBuilder;
2425

@@ -36,6 +37,7 @@
3637
@Named
3738
public class ConfluenceRestClient extends AtlassianRestClient {
3839

40+
public static final String SYSTEM_INFO_API = "wiki/rest/api/settings/systemInfo";
3941
public static final String REST_API_SEARCH = "wiki/rest/api/content/search";
4042
public static final String REST_API_FETCH_CONTENT = "wiki/rest/api/content/";
4143
public static final String REST_API_CONTENT_EXPAND_PARAM = "?expand=body.view";
@@ -67,6 +69,12 @@ public ConfluenceRestClient(RestTemplate restTemplate, AtlassianAuthConfig authC
6769
contentRequestedCounter = pluginMetrics.counter(PAGES_REQUESTED);
6870
}
6971

72+
public ConfluenceServerMetadata getConfluenceServerMetadata() {
73+
URI uri = UriComponentsBuilder.fromHttpUrl(authConfig.getUrl() + SYSTEM_INFO_API)
74+
.buildAndExpand().toUri();
75+
return invokeRestApi(uri, ConfluenceServerMetadata.class).getBody();
76+
}
77+
7078
/**
7179
* Method to get all Contents in a paginated fashion.
7280
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package org.opensearch.dataprepper.plugins.source.confluence.utils;
2+
3+
import java.time.Duration;
4+
import java.time.LocalDateTime;
5+
import java.time.ZoneId;
6+
import java.time.ZonedDateTime;
7+
8+
public class TimezoneHelper {
9+
10+
public static Duration getUTCTimezoneOffset(ZoneId timezone) {
11+
return getTimezoneOffset(timezone, ZoneId.of("UTC"));
12+
}
13+
14+
public static Duration getTimezoneOffset(ZoneId timezone1, ZoneId timezone2) {
15+
// Get current instant
16+
LocalDateTime now = LocalDateTime.now();
17+
18+
// Get offsets for both zones
19+
ZonedDateTime zone1DateTime = now.atZone(timezone1);
20+
ZonedDateTime zone2DateTime = now.atZone(timezone2);
21+
22+
// Calculate difference
23+
return Duration.between(zone1DateTime, zone2DateTime);
24+
}
25+
}

data-prepper-plugins/saas-source-plugins/confluence-source/src/test/java/org/opensearch/dataprepper/plugins/source/confluence/ConfluenceSourceTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.opensearch.dataprepper.plugins.source.atlassian.configuration.AuthenticationConfig;
2525
import org.opensearch.dataprepper.plugins.source.atlassian.configuration.BasicConfig;
2626
import org.opensearch.dataprepper.plugins.source.atlassian.rest.auth.AtlassianAuthConfig;
27+
import org.opensearch.dataprepper.plugins.source.confluence.models.ConfluenceServerMetadata;
2728
import org.opensearch.dataprepper.plugins.source.source_crawler.base.Crawler;
2829
import org.opensearch.dataprepper.plugins.source.source_crawler.base.PluginExecutorServiceProvider;
2930

@@ -64,19 +65,24 @@ public class ConfluenceSourceTest {
6465
@Mock
6566
private PluginExecutorServiceProvider executorServiceProvider;
6667
@Mock
68+
private ConfluenceService service;
69+
@Mock
6770
private ExecutorService executorService;
71+
@Mock
72+
private ConfluenceServerMetadata serverMetadata;
6873

6974
@Test
7075
void initialization() {
7176
when(executorServiceProvider.get()).thenReturn(executorService);
72-
ConfluenceSource source = new ConfluenceSource(pluginMetrics, confluenceSourceConfig, jiraOauthConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
77+
ConfluenceSource source = new ConfluenceSource(pluginMetrics, confluenceSourceConfig, jiraOauthConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, service);
7378
assertNotNull(source);
7479
}
7580

7681
@Test
7782
void testStart() {
7883
when(executorServiceProvider.get()).thenReturn(executorService);
79-
ConfluenceSource source = new ConfluenceSource(pluginMetrics, confluenceSourceConfig, jiraOauthConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
84+
when(service.getConfluenceServerMetadata()).thenReturn(serverMetadata);
85+
ConfluenceSource source = new ConfluenceSource(pluginMetrics, confluenceSourceConfig, jiraOauthConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, service);
8086
when(confluenceSourceConfig.getAccountUrl()).thenReturn(ACCESSIBLE_RESOURCES);
8187
when(confluenceSourceConfig.getAuthType()).thenReturn(BASIC);
8288
when(confluenceSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
@@ -92,7 +98,8 @@ void testStart() {
9298
@Test
9399
void testStop() {
94100
when(executorServiceProvider.get()).thenReturn(executorService);
95-
ConfluenceSource source = new ConfluenceSource(pluginMetrics, confluenceSourceConfig, jiraOauthConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
101+
when(service.getConfluenceServerMetadata()).thenReturn(serverMetadata);
102+
ConfluenceSource source = new ConfluenceSource(pluginMetrics, confluenceSourceConfig, jiraOauthConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, service);
96103
when(confluenceSourceConfig.getAccountUrl()).thenReturn(ACCESSIBLE_RESOURCES);
97104
when(confluenceSourceConfig.getAuthType()).thenReturn(BASIC);
98105
when(confluenceSourceConfig.getAuthenticationConfig()).thenReturn(authenticationConfig);
@@ -109,7 +116,7 @@ void testStop() {
109116
@Test
110117
void testStop_WhenNotStarted() {
111118
when(executorServiceProvider.get()).thenReturn(executorService);
112-
ConfluenceSource source = new ConfluenceSource(pluginMetrics, confluenceSourceConfig, jiraOauthConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider);
119+
ConfluenceSource source = new ConfluenceSource(pluginMetrics, confluenceSourceConfig, jiraOauthConfig, pluginFactory, acknowledgementSetManager, crawler, executorServiceProvider, service);
113120

114121
source.stop();
115122

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package org.opensearch.dataprepper.plugins.source.confluence.utils;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.junit.jupiter.params.ParameterizedTest;
5+
import org.junit.jupiter.params.provider.Arguments;
6+
import org.junit.jupiter.params.provider.MethodSource;
7+
8+
import java.time.Duration;
9+
import java.time.LocalDateTime;
10+
import java.time.ZoneId;
11+
import java.time.ZonedDateTime;
12+
import java.util.stream.Stream;
13+
14+
import static org.junit.jupiter.api.Assertions.assertAll;
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.junit.jupiter.api.Assertions.assertThrows;
17+
import static org.junit.jupiter.api.Assertions.assertTrue;
18+
import static org.junit.jupiter.params.provider.Arguments.arguments;
19+
20+
public class TimezoneHelperTest {
21+
22+
static Stream<Arguments> timezoneOffsetTestCases() {
23+
return Stream.of(
24+
// Format: sourceZone, targetZone, expectedDuration
25+
26+
// Same timezone (should return zero)
27+
arguments(ZoneId.of("Europe/Paris"), ZoneId.of("Europe/Paris"), Duration.ofHours(0), Duration.ofHours(0)),
28+
29+
// Sydney to London (Sydney is ahead, so positive offset)
30+
arguments(ZoneId.of("Australia/Sydney"), ZoneId.of("Europe/London"), Duration.ofHours(11), Duration.ofHours(11)),
31+
32+
// New York to Tokyo (New York is behind, so negative offset)
33+
arguments(ZoneId.of("America/New_York"), ZoneId.of("Asia/Tokyo"), Duration.ofHours(-13), Duration.ofHours(-14)),
34+
35+
// Los Angeles to Berlin
36+
arguments(ZoneId.of("America/Los_Angeles"), ZoneId.of("Europe/Berlin"), Duration.ofHours(-8), Duration.ofHours(-9)),
37+
38+
// Auckland to Hawaii (crossing international date line)
39+
arguments(ZoneId.of("Pacific/Auckland"), ZoneId.of("Pacific/Honolulu"), Duration.ofHours(23), Duration.ofHours(23))
40+
);
41+
}
42+
43+
@ParameterizedTest
44+
@MethodSource("timezoneOffsetTestCases")
45+
void testGetTimezoneOffset(ZoneId sourceZone, ZoneId targetZone, Duration minDuration, Duration maxDuration) {
46+
Duration reverseOffset = TimezoneHelper.getTimezoneOffset(sourceZone, targetZone);
47+
// The Range check is because of the Day light saving time changes. Otherwise, this would have been equals check
48+
assertAll(
49+
() -> assertTrue(reverseOffset.compareTo(maxDuration) >= 0, "Timezone Offset should be at least " + minDuration),
50+
() -> assertTrue(reverseOffset.compareTo(maxDuration) <= 0, "Timezone Offset should be at most " + maxDuration)
51+
);
52+
}
53+
54+
55+
@Test
56+
public void testEdgeCases() {
57+
// Test with extreme timezone differences
58+
Duration samoaToHonoluluOffset = TimezoneHelper.getTimezoneOffset(
59+
ZoneId.of("Pacific/Apia"), // UTC+13/+14
60+
ZoneId.of("Pacific/Honolulu") // UTC-10
61+
);
62+
// The difference should be around 23-24 hours
63+
assertTrue(Math.abs(samoaToHonoluluOffset.toHours()) >= 23,
64+
"Samoa to Honolulu offset should be at least 23 hours");
65+
}
66+
67+
@Test
68+
public void testNullTimezone() {
69+
// Test with null timezone (should throw NullPointerException)
70+
assertThrows(NullPointerException.class, () -> TimezoneHelper.getUTCTimezoneOffset(null));
71+
assertThrows(NullPointerException.class, () -> TimezoneHelper.getTimezoneOffset(null, ZoneId.of("UTC")));
72+
assertThrows(NullPointerException.class, () -> TimezoneHelper.getTimezoneOffset(ZoneId.of("UTC"), null));
73+
}
74+
75+
@Test
76+
public void testConsistencyWithCurrentTime() {
77+
// This test verifies that the offset calculation is consistent with the current time
78+
ZoneId zone1 = ZoneId.of("Europe/Berlin");
79+
ZoneId zone2 = ZoneId.of("America/Los_Angeles");
80+
81+
// Calculate offset using our helper
82+
Duration calculatedOffset = TimezoneHelper.getTimezoneOffset(zone1, zone2);
83+
84+
// Calculate offset manually for verification
85+
LocalDateTime now = LocalDateTime.now();
86+
ZonedDateTime berlin = now.atZone(zone1);
87+
ZonedDateTime la = now.atZone(zone2);
88+
Duration expectedOffset = Duration.between(berlin, la);
89+
90+
assertEquals(expectedOffset.getSeconds(), calculatedOffset.getSeconds(),
91+
"Calculated offset should match expected offset");
92+
}
93+
}

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/Crawler.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,18 @@ public Crawler(CrawlerClient client, PluginMetrics pluginMetrics) {
3939
}
4040

4141
public Instant crawl(LeaderPartition leaderPartition,
42-
EnhancedSourceCoordinator coordinator, int batchSize) {
42+
EnhancedSourceCoordinator coordinator, int batchSize, Duration pollingTimezoneOffset) {
4343
long startTime = System.currentTimeMillis();
4444
Instant lastLeaderSavedInstant = Instant.now();
4545
LeaderProgressState leaderProgressState = leaderPartition.getProgressState().get();
46+
// Leader state is always saved in UTC since the timestamps in the api response are always in UTC
4647
Instant lastPollTime = leaderProgressState.getLastPollTime();
47-
client.setLastPollTime(lastPollTime);
48+
// Adjust the time specific to the crawling source service setting before polling for changes
49+
Instant lastPollTimeWithOffsetAdjustment = lastPollTime.plusSeconds(pollingTimezoneOffset.toSeconds());
50+
client.setLastPollTime(lastPollTimeWithOffsetAdjustment);
4851
Iterator<ItemInfo> itemInfoIterator = client.listItems();
49-
Instant latestModifiedTime = Instant.from(lastPollTime);
50-
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTime);
52+
Instant latestModifiedTime = lastPollTime;
53+
log.info("Starting to crawl the source with lastPollTime: {}", lastPollTimeWithOffsetAdjustment);
5154
do {
5255
final List<ItemInfo> itemInfoList = new ArrayList<>();
5356
for (int i = 0; i < batchSize && itemInfoIterator.hasNext(); i++) {

data-prepper-plugins/saas-source-plugins/source-crawler/src/main/java/org/opensearch/dataprepper/plugins/source/source_crawler/base/CrawlerSourcePlugin.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23+
import java.time.Duration;
2324
import java.util.Objects;
25+
import java.util.Optional;
2426
import java.util.concurrent.ExecutorService;
2527
import java.util.function.Function;
2628

@@ -33,18 +35,16 @@ public abstract class CrawlerSourcePlugin implements Source<Record<Event>>, Uses
3335

3436

3537
private static final Logger log = LoggerFactory.getLogger(CrawlerSourcePlugin.class);
38+
private EnhancedSourceCoordinator coordinator;
39+
private Optional<SourceServerMetadata> serverMetadata = Optional.empty();
3640
private final PluginMetrics pluginMetrics;
3741
private final PluginFactory pluginFactory;
38-
3942
private final AcknowledgementSetManager acknowledgementSetManager;
40-
4143
private final ExecutorService executorService;
4244
private final CrawlerSourceConfig sourceConfig;
4345
private final Crawler crawler;
4446
private final String sourcePluginName;
4547
private final int batchSize;
46-
private EnhancedSourceCoordinator coordinator;
47-
private Buffer<Record<Event>> buffer;
4848

4949

5050
public CrawlerSourcePlugin(final String sourcePluginName,
@@ -66,17 +66,24 @@ public CrawlerSourcePlugin(final String sourcePluginName,
6666
this.executorService = executorServiceProvider.get();
6767
}
6868

69+
public void setServerMetadata(SourceServerMetadata serverMetadata) {
70+
this.serverMetadata = Optional.of(serverMetadata);
71+
}
72+
6973

7074
@Override
7175
public void start(Buffer<Record<Event>> buffer) {
7276
Objects.requireNonNull(coordinator);
7377
log.info("Starting {} Source Plugin", sourcePluginName);
74-
this.buffer = buffer;
78+
Duration timezoneOffset = Duration.ofSeconds(0);
7579

7680
boolean isPartitionCreated = coordinator.createPartition(new LeaderPartition());
7781
log.debug("Leader partition creation status: {}", isPartitionCreated);
82+
if (serverMetadata.isPresent()) {
83+
timezoneOffset = serverMetadata.get().getPollingTimezoneOffset();
84+
}
7885

79-
Runnable leaderScheduler = new LeaderScheduler(coordinator, this, crawler, batchSize);
86+
Runnable leaderScheduler = new LeaderScheduler(coordinator, crawler, batchSize, timezoneOffset);
8087
this.executorService.submit(leaderScheduler);
8188
//Register worker threaders
8289
for (int i = 0; i < sourceConfig.DEFAULT_NUMBER_OF_WORKERS; i++) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.opensearch.dataprepper.plugins.source.source_crawler.base;
2+
3+
import java.time.Duration;
4+
5+
/**
6+
* Marker interface to all the SAAS connectors source server configuration metadata
7+
*/
8+
public interface SourceServerMetadata {
9+
10+
/**
11+
* Returns the polling timezone offset
12+
*
13+
* @return the polling timezone offset
14+
*/
15+
Duration getPollingTimezoneOffset();
16+
}

0 commit comments

Comments
 (0)