Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions Cognite.Extensions/TimeSeries/DataPointExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -466,13 +466,15 @@ private static async Task<HashSet<Identity>> DeleteDataPointsIgnoreErrorsChunk(
/// <param name="chunkSize">Number of timeseries per request</param>
/// <param name="throttleSize">Maximum number of parallel requests</param>
/// <param name="token">Cancellation token</param>
/// <param name="ignoreBadDataPoints">Ignore bad status datapoints, true by default</param>
/// <returns>Dictionary from externalId to last timestamp, only contains existing timeseries</returns>
public static async Task<IDictionary<Identity, DateTime>> GetLatestTimestamps(
this DataPointsResource dataPoints,
IEnumerable<(Identity id, DateTime before)> ids,
int chunkSize,
int throttleSize,
CancellationToken token)
CancellationToken token,
bool ignoreBadDataPoints = true)
{
var ret = new ConcurrentDictionary<Identity, DateTime>();
var idSet = new HashSet<Identity>(ids.Select(id => id.id));
Expand All @@ -497,6 +499,7 @@ public static async Task<IDictionary<Identity, DateTime>> GetLatestTimestamps(
{
idt.Before = pair.before.ToUnixTimeMilliseconds().ToString();
}
idt.IgnoreBadDataPoints = ignoreBadDataPoints;
return idt;
})
.ChunkBy(chunkSize)
Expand Down Expand Up @@ -558,21 +561,23 @@ await generators
/// <param name="chunkSize">Number of timeseries per request</param>
/// <param name="throttleSize">Maximum number of parallel requests</param>
/// <param name="token">Cancellation token</param>
/// <param name="ignoreBadDataPoints">Ignore bad status datapoints, true by default</param>
/// <returns>Dictionary from externalId to first timestamp, only contains existing timeseries</returns>
public static async Task<IDictionary<Identity, DateTime>> GetEarliestTimestamps(
this DataPointsResource dataPoints,
IEnumerable<(Identity id, DateTime after)> ids,
int chunkSize,
int throttleSize,
CancellationToken token)
CancellationToken token,
bool ignoreBadDataPoints = true)
{
var ret = new ConcurrentDictionary<Identity, DateTime>();
var idSet = new HashSet<Identity>(ids.Select(id => id.id));

var chunks = ids
.Select((pair) =>
{
var query = new DataPointsQueryItem();
var query = new DataPointsQueryItem() { IgnoreBadDataPoints = ignoreBadDataPoints };
if (pair.id.Id.HasValue)
{
query.Id = pair.id.Id.Value;
Expand Down Expand Up @@ -660,6 +665,7 @@ await generators
/// <param name="latest">If true, fetch latest timestamps</param>
/// <param name="earliest">If true, fetch earliest timestamps</param>
/// <param name="token">Cancellation token</param>
/// <param name="ignoreBadDataPoints">Ignore bad status datapoints, true by default</param>
/// <returns></returns>
public static async Task<IDictionary<Identity, TimeRange>> GetExtractedRanges(
this DataPointsResource dataPoints,
Expand All @@ -669,7 +675,8 @@ public static async Task<IDictionary<Identity, TimeRange>> GetExtractedRanges(
int throttleSize,
bool latest,
bool earliest,
CancellationToken token)
CancellationToken token,
bool ignoreBadDataPoints = true)
{
if (ids == null)
{
Expand All @@ -684,12 +691,12 @@ public static async Task<IDictionary<Identity, TimeRange>> GetExtractedRanges(
if (latest)
{
tasks.Add(dataPoints.GetLatestTimestamps(ids.Select(pair => (pair.id, pair.limit?.Last ?? DateTime.MaxValue)),
chunkSizeLatest, throttleSize, token));
chunkSizeLatest, throttleSize, token, ignoreBadDataPoints));
}
if (earliest)
{
tasks.Add(dataPoints.GetEarliestTimestamps(ids.Select(pair => (pair.id, pair.limit?.First ?? CogniteTime.DateTimeEpoch)),
chunkSizeEarliest, throttleSize, token));
chunkSizeEarliest, throttleSize, token, ignoreBadDataPoints));
}
var results = await Task.WhenAll(tasks).ConfigureAwait(false);
if (latest)
Expand Down
14 changes: 10 additions & 4 deletions ExtractorUtils/Cognite/CogniteDestination.cs
Original file line number Diff line number Diff line change
Expand Up @@ -719,12 +719,14 @@ public Task DeleteRowsAsync(string dbName, string tableName, IEnumerable<string>
/// <param name="token">Cancellation token</param>
/// <param name="earliest">If true, fetch earliest timestamps, default true</param>
/// <param name="latest">If true, fetch latest timestamps, default true</param>
/// <param name="ignoreBadDataPoints">Ignore bad status datapoints, true by default</param>
/// <returns></returns>
public Task<IDictionary<Identity, TimeRange>> GetExtractedRanges(
IEnumerable<Identity> ids,
CancellationToken token,
bool earliest = true,
bool latest = true
bool latest = true,
bool ignoreBadDataPoints = true
)
{
return _client.DataPoints.GetExtractedRanges(
Expand All @@ -734,7 +736,8 @@ public Task<IDictionary<Identity, TimeRange>> GetExtractedRanges(
_throttling.Ranges,
latest,
earliest,
token);
token,
ignoreBadDataPoints);
}


Expand All @@ -747,12 +750,14 @@ public Task<IDictionary<Identity, TimeRange>> GetExtractedRanges(
/// <param name="token">Cancellation token</param>
/// <param name="earliest">If true, fetch earliest timestamps, default true</param>
/// <param name="latest">If true, fetch latest timestamps, default true</param>
/// <param name="ignoreBadDataPoints">Ignore bad status datapoints, true by default</param>
/// <returns></returns>
public Task<IDictionary<Identity, TimeRange>> GetExtractedRanges(
IEnumerable<(Identity id, TimeRange limit)> ids,
CancellationToken token,
bool earliest = true,
bool latest = true
bool latest = true,
bool ignoreBadDataPoints = true
)
{
return _client.DataPoints.GetExtractedRanges(
Expand All @@ -762,7 +767,8 @@ public Task<IDictionary<Identity, TimeRange>> GetExtractedRanges(
_throttling.Ranges,
latest,
earliest,
token);
token,
ignoreBadDataPoints);
}


Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.39.1
1.40.0