diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java
index 02271b9cae4..96cb58547f6 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/BasePerformanceEntry.java
@@ -16,6 +16,7 @@
* A smaller entry that simply records usage data, meant for aggregating into the larger entry.
*/
public class BasePerformanceEntry implements LogOutputAppendable {
+
private long usageNanos;
private long cpuNanos;
@@ -24,6 +25,12 @@ public class BasePerformanceEntry implements LogOutputAppendable {
private long allocatedBytes;
private long poolAllocatedBytes;
+ private long dataReadNanos;
+ private long dataReadCount;
+ private long dataReadBytes;
+ private long metadataReadNanos;
+ private long metadataReadCount;
+
private long startTimeNanos;
private long startCpuNanos;
@@ -32,10 +39,25 @@ public class BasePerformanceEntry implements LogOutputAppendable {
private long startAllocatedBytes;
private long startPoolAllocatedBytes;
+ private long startDataReadNanos;
+ private long startDataReadCount;
+ private long startDataReadBytes;
+ private long startMetadataReadNanos;
+ private long startMetadataReadCount;
+
public synchronized void onBaseEntryStart() {
startAllocatedBytes = ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes();
startPoolAllocatedBytes = QueryPerformanceRecorderState.getPoolAllocatedBytesForCurrentThread();
+ final QueryPerformanceRecorderState.ReadTracker readTracker =
+ QueryPerformanceRecorderState.getReadTrackerForCurrentThread();
+
+ startDataReadNanos = readTracker.getDataReadNanos();
+ startDataReadCount = readTracker.getDataReadCount();
+ startDataReadBytes = readTracker.getDataReadBytes();
+ startMetadataReadNanos = readTracker.getMetadataReadNano();
+ startMetadataReadCount = readTracker.getMetadataReadCount();
+
startUserCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadUserTime();
startCpuNanos = ThreadProfiler.DEFAULT.getCurrentThreadCpuTime();
startTimeNanos = System.nanoTime();
@@ -54,12 +76,27 @@ public synchronized void onBaseEntryEnd() {
allocatedBytes = plus(allocatedBytes,
minus(ThreadProfiler.DEFAULT.getCurrentThreadAllocatedBytes(), startAllocatedBytes));
+ final QueryPerformanceRecorderState.ReadTracker readTracker =
+ QueryPerformanceRecorderState.getReadTrackerForCurrentThread();
+
+ dataReadNanos += readTracker.getDataReadNanos() - startDataReadNanos;
+ dataReadCount += readTracker.getDataReadCount() - startDataReadCount;
+ dataReadBytes += readTracker.getDataReadBytes() - startDataReadBytes;
+ metadataReadNanos += readTracker.getMetadataReadNano() - startMetadataReadNanos;
+ metadataReadCount += readTracker.getMetadataReadCount() - startMetadataReadCount;
+
startAllocatedBytes = 0;
startPoolAllocatedBytes = 0;
startUserCpuNanos = 0;
startCpuNanos = 0;
startTimeNanos = 0;
+
+ startDataReadNanos = 0;
+ startDataReadCount = 0;
+ startDataReadBytes = 0;
+ startMetadataReadNanos = 0;
+ startMetadataReadCount = 0;
}
synchronized void baseEntryReset() {
@@ -72,6 +109,18 @@ synchronized void baseEntryReset() {
allocatedBytes = 0;
poolAllocatedBytes = 0;
+
+ dataReadNanos = 0;
+ dataReadCount = 0;
+ dataReadBytes = 0;
+ metadataReadNanos = 0;
+ metadataReadCount = 0;
+
+ startDataReadNanos = 0;
+ startDataReadCount = 0;
+ startDataReadBytes = 0;
+ startMetadataReadNanos = 0;
+ startMetadataReadCount = 0;
}
/**
@@ -124,6 +173,58 @@ public long getPoolAllocatedBytes() {
return poolAllocatedBytes;
}
+ /**
+ * Get the aggregate time spent reading data in nanoseconds. This getter should be called by exclusive owners of the
+ * entry, and never concurrently with mutators.
+ *
+ * @return total data read time in nanos
+ */
+ public long getDataReadNanos() {
+ return dataReadNanos;
+ }
+
+ /**
+ * Get the aggregate number of data read operations. This getter should be called by exclusive owners of the entry,
+ * and never concurrently with mutators.
+ *
+ * @return total number of data read operations
+ */
+ public long getDataReadCount() {
+ return dataReadCount;
+ }
+
+ /**
+ * Get the aggregate number of bytes read in data read operations. This getter should be called by exclusive owners
+ * of the entry, and never concurrently with mutators.
+ *
+ * @return total number of bytes read
+ */
+ public long getDataReadBytes() {
+ return dataReadBytes;
+ }
+
+ /**
+ * Get the aggregate time spent on metadata operations (e.g. listing files, checking existence, determining file
+ * sizes) in nanoseconds. This getter should be called by exclusive owners of the entry, and never concurrently with
+ * mutators.
+ *
+ * @return total metadata operation time in nanos
+ */
+ public long getMetadataReadNanos() {
+ return metadataReadNanos;
+ }
+
+ /**
+ * Get the aggregate number of metadata operations. This getter should be called by exclusive owners of the entry,
+ * and never concurrently with mutators.
+ *
+ * @return total number of metadata operations
+ */
+ public long getMetadataReadCount() {
+ return metadataReadCount;
+ }
+
+
@Override
public LogOutput append(@NotNull final LogOutput logOutput) {
final LogOutput currentValues = logOutput.append("BasePerformanceEntry{")
@@ -131,7 +232,12 @@ public LogOutput append(@NotNull final LogOutput logOutput) {
.append(", intervalCpuNanos=").append(cpuNanos)
.append(", intervalUserCpuNanos=").append(userCpuNanos)
.append(", intervalAllocatedBytes=").append(allocatedBytes)
- .append(", intervalPoolAllocatedBytes=").append(poolAllocatedBytes);
+ .append(", intervalPoolAllocatedBytes=").append(poolAllocatedBytes)
+ .append(", dataReadNanos=").append(dataReadNanos)
+ .append(", dataReadCount=").append(dataReadCount)
+ .append(", dataReadBytes=").append(dataReadBytes)
+ .append(", metadataReadNanos=").append(metadataReadNanos)
+ .append(", metadataReadCount=").append(metadataReadCount);
return appendStart(currentValues)
.append('}');
}
@@ -157,5 +263,11 @@ public synchronized void accumulate(@NotNull final BasePerformanceEntry entry) {
this.allocatedBytes = plus(this.allocatedBytes, entry.allocatedBytes);
this.poolAllocatedBytes = plus(this.poolAllocatedBytes, entry.poolAllocatedBytes);
+
+ this.dataReadNanos += entry.dataReadNanos;
+ this.dataReadCount += entry.dataReadCount;
+ this.dataReadBytes += entry.dataReadBytes;
+ this.metadataReadNanos += entry.metadataReadNanos;
+ this.metadataReadCount += entry.metadataReadCount;
}
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/PerformanceEntry.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/PerformanceEntry.java
index 7126732f2d3..c5a79f53c2c 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/PerformanceEntry.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/PerformanceEntry.java
@@ -237,7 +237,8 @@ boolean shouldLogEntryInterval() {
loggedOnce = true;
return true;
}
- return invocationCount > 0 && UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getUsageNanos());
+ return invocationCount > 0
+ && UpdatePerformanceTracker.LOG_THRESHOLD.shouldLog(getUsageNanos(), getDataReadCount());
}
public void accumulate(PerformanceEntry entry) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceLogThreshold.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceLogThreshold.java
index 670b5d298cc..c41b3fee6a0 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceLogThreshold.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceLogThreshold.java
@@ -11,9 +11,10 @@
*/
public class QueryPerformanceLogThreshold {
private final long minimumDurationNanos;
+ private final long minimumReadCount;
/**
- * Create a log threshold object for a particular kind of log update
+ * Create a log threshold object for a particular kind of log update.
*
* - "" is for instrumented QueryPerformanceLog/QueryOperationPerformanceLog nuggets
* - "Uninstrumented" is for uninstrumented QueryPerformanceLog/QueryOperationPerformanceLog nuggets, and
@@ -22,50 +23,60 @@ public class QueryPerformanceLogThreshold {
*
* @param kind kind of update to derive property names
* @param defaultDuration default value for duration nanos
- * @param defaultRepeatedReads default value for repeated read threshold
- * @param defaultInitialReads default value for initial read threshold
+ * @param defaultReadCount default value for the minimum read count threshold
*/
- private QueryPerformanceLogThreshold(String kind, long defaultDuration, long defaultRepeatedReads,
- long defaultInitialReads) {
+ public QueryPerformanceLogThreshold(String kind, long defaultDuration, long defaultReadCount) {
minimumDurationNanos = Configuration.getInstance()
.getLongWithDefault("QueryPerformance.minimum" + kind + "LogDurationNanos", defaultDuration);
+ minimumReadCount = Configuration.getInstance()
+ .getLongWithDefault("QueryPerformance.minimum" + kind + "LogReadCount", defaultReadCount);
}
/**
- * Create a log threshold object for a particular kind of log update
+ * Create a log threshold object for a particular kind of log update. The minimum read count defaults to 1.
*
* - "" is for instrumented QueryPerformanceLog/QueryOperationPerformanceLog nuggets
* - "Uninstrumented" is for uninstrumented QueryPerformanceLog/QueryOperationPerformanceLog nuggets, and
*
- "Update" is for UpdatePerformanceLog entry intervals.
*
*
- * The initial and repeated read threshold defaults to 1.
- *
* @param kind kind of update to derive property names
* @param defaultDuration default value for duration nanos
*/
public QueryPerformanceLogThreshold(String kind, long defaultDuration) {
- this(kind, defaultDuration, 1, 1);
+ this(kind, defaultDuration, 1);
}
/**
- * The minimum duration for an QueryPerformanceNugget to be logged based on its duration (or entry interval usage
- * for the UpdatePerformanceLog). The value 0 logs everything. The value -1 will not log anything based on duration.
+ * The minimum duration for a QueryPerformanceNugget to be logged based on its duration (or entry interval usage for
+ * the UpdatePerformanceLog). The value 0 logs everything. The value -1 will not log anything based on duration.
*/
private long getMinimumDurationNanos() {
return minimumDurationNanos;
}
+ /**
+ * The minimum data read count for a QueryPerformanceNugget to be logged. The value 0 means that reads alone will
+ * never trigger logging. The default value of 1 means any read will trigger logging.
+ */
+ private long getMinimumReadCount() {
+ return minimumReadCount;
+ }
+
/**
* Should this item be logged?
*
* @param duration the duration (or usage) of the item
+ * @param dataReadCount the number of data read operations performed
* @return true if the item exceeds our logging threshold, and thus should be logged
*/
- public boolean shouldLog(final long duration) {
+ public boolean shouldLog(final long duration, final long dataReadCount) {
if (getMinimumDurationNanos() >= 0 && duration >= getMinimumDurationNanos()) {
return true;
}
+ if (getMinimumReadCount() > 0 && dataReadCount >= getMinimumReadCount()) {
+ return true;
+ }
return false;
}
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java
index c3cb6232643..d2c1a98a285 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceNugget.java
@@ -29,7 +29,7 @@ public class QueryPerformanceNugget extends BasePerformanceEntry implements Safe
private static final int MAX_DESCRIPTION_LENGTH = 16 << 10;
/**
- * A re-usable "dummy" nugget which will never collect any information or be recorded.
+ * A re-usable "dummy" nugget that will never collect any information or be recorded.
*/
static final QueryPerformanceNugget DUMMY_NUGGET = new QueryPerformanceNugget() {
@Override
@@ -37,6 +37,7 @@ public void accumulate(@NotNull BasePerformanceEntry entry) {
// non-synchronized no-op override
}
+
@Override
public boolean shouldLog() {
return false;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderImpl.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderImpl.java
index 5294d39aafd..086cc675b6f 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderImpl.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderImpl.java
@@ -290,9 +290,9 @@ private boolean shouldLogNugget(@NotNull QueryPerformanceNugget nugget) {
// abnormal condition and the nugget should be logged
return true;
} else if (nugget == catchAllNugget) {
- return UNINSTRUMENTED_LOG_THRESHOLD.shouldLog(nugget.getUsageNanos());
+ return UNINSTRUMENTED_LOG_THRESHOLD.shouldLog(nugget.getUsageNanos(), nugget.getDataReadCount());
} else {
- return LOG_THRESHOLD.shouldLog(nugget.getUsageNanos());
+ return LOG_THRESHOLD.shouldLog(nugget.getUsageNanos(), nugget.getDataReadCount());
}
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderState.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderState.java
index a27e3c33044..b21a9f56385 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderState.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/QueryPerformanceRecorderState.java
@@ -3,6 +3,9 @@
//
package io.deephaven.engine.table.impl.perf;
+import io.deephaven.base.stats.Item;
+import io.deephaven.base.stats.State;
+import io.deephaven.base.stats.Stats;
import io.deephaven.chunk.util.pools.ChunkPoolInstrumentation;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.updategraph.UpdateGraphLock;
@@ -23,6 +26,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
@@ -42,6 +46,20 @@ public abstract class QueryPerformanceRecorderState {
? 0L
: io.deephaven.util.QueryConstants.NULL_LONG));
+ private static final String METADATA_OP_STATS_GROUP = "MetadataOp";
+
+ /**
+ * Cache of Stats items keyed by metadata operation type. Avoids repeated synchronized lookup in
+ * {@link Stats#makeItem} on every I/O operation.
+ */
+ private static final ConcurrentHashMap> METADATA_OP_STATS = new ConcurrentHashMap<>();
+
+ /**
+ * An object that contains the cumulative data read and metadata read counters for the current thread.
+ */
+ private static final ThreadLocal READ_TRACKER = ThreadLocal.withInitial(ReadTracker::new);
+
+
static {
// initialize the packages to skip when determining the callsite
@@ -140,6 +158,45 @@ static long getPoolAllocatedBytesForCurrentThread() {
return POOL_ALLOCATED_BYTES.get().get();
}
+ /**
+ * Record a data read operation. Accumulates into the current thread's cumulative counters.
+ *
+ * @param nanos time spent on the read in nanoseconds
+ * @param bytesRead number of bytes read
+ */
+ public static void recordRead(final long nanos, final long bytesRead) {
+ if (bytesRead < 0) {
+ return;
+ }
+ READ_TRACKER.get().recordRead(nanos, bytesRead);
+ }
+
+ /**
+ * Record a metadata operation (e.g. listing files, checking existence, determining file sizes). Accumulates into
+ * the current thread's cumulative counters and records a Stats histogram sample.
+ *
+ * @param type the type of metadata operation (e.g. "exists", "list", "walk", "size")
+ * @param nanos time spent on the metadata operation in nanoseconds
+ */
+ public static void recordMetadataOperation(
+ @NotNull final String type, final long nanos) {
+ READ_TRACKER.get().recordMeta(nanos);
+ METADATA_OP_STATS.computeIfAbsent(type,
+ t -> Stats.makeItem(METADATA_OP_STATS_GROUP, t, State.FACTORY,
+ "Metadata operation timing (nanos) for type: " + t))
+ .getValue().sample(nanos);
+ }
+
+ /**
+ * Gets the tracker for the current thread. The tracker is not thread-safe and is mutable. You must capture any
+ * values from the tracker rather than holding a reference to the returned object.
+ *
+ * @return the tracker for the current thread
+ */
+ public static ReadTracker getReadTrackerForCurrentThread() {
+ return READ_TRACKER.get();
+ }
+
/**
* See {@link QueryPerformanceRecorder#getCallerLine()}.
*/
@@ -290,4 +347,72 @@ public void abortQuery() {
throw new UnsupportedOperationException("Dummy recorder does not support abortQuery()");
}
}
+
+ /**
+ * Thread local read tracker; no synchronization is required because multiple threads are never permitted to access
+ * it simultaneously.
+ */
+ public static class ReadTracker {
+ long readCount;
+ long readNanos;
+ long readBytes;
+ long metadataReadCount;
+ long metadataReadNanos;
+
+ private void recordRead(long nanos, long bytesRead) {
+ readCount++;
+ readNanos += nanos;
+ readBytes += bytesRead;
+ }
+
+ private void recordMeta(long nanos) {
+ metadataReadCount++;
+ metadataReadNanos += nanos;
+ }
+
+ /**
+ * Get the cumulative data read nanos for this tracker.
+ *
+ * @return total data read nanos accumulated for this tracker
+ */
+ public long getDataReadNanos() {
+ return readNanos;
+ }
+
+ /**
+ * Get the cumulative data read count for this tracker.
+ *
+ * @return total data read count accumulated on this tracker
+ */
+ long getDataReadCount() {
+ return readCount;
+ }
+
+ /**
+ * Get the cumulative data read bytes for this tracker.
+ *
+ * @return total data read bytes accumulated on this tracker
+ */
+ public long getDataReadBytes() {
+ return readBytes;
+ }
+
+ /**
+ * Get the cumulative metadata read nanos for this tracker.
+ *
+ * @return total metadata read nanos accumulated on this tracker
+ */
+ long getMetadataReadNano() {
+ return metadataReadNanos;
+ }
+
+ /**
+ * Get the cumulative metadata read count for this tracker.
+ *
+ * @return total metadata read count accumulated on this tracker
+ */
+ long getMetadataReadCount() {
+ return metadataReadCount;
+ }
+ }
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/UpdatePerformanceStreamPublisher.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/UpdatePerformanceStreamPublisher.java
index c841f896da5..f81fd7a3189 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/UpdatePerformanceStreamPublisher.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/perf/UpdatePerformanceStreamPublisher.java
@@ -41,6 +41,11 @@ class UpdatePerformanceStreamPublisher implements StreamPublisher {
ColumnDefinition.ofLong("CollectionTimeNanos"),
ColumnDefinition.ofLong("AllocatedBytes"),
ColumnDefinition.ofLong("PoolAllocatedBytes"),
+ ColumnDefinition.ofLong("DataReadNanos"),
+ ColumnDefinition.ofLong("DataReadCount"),
+ ColumnDefinition.ofLong("DataReadBytes"),
+ ColumnDefinition.ofLong("MetadataReadNanos"),
+ ColumnDefinition.ofLong("MetadataReadCount"),
ColumnDefinition.ofString("AuthContext"),
ColumnDefinition.ofString("UpdateGraph"),
ColumnDefinition.ofLong("WorkerHeapSize"));
@@ -69,54 +74,65 @@ public void register(@NotNull StreamConsumer consumer) {
}
public synchronized void add(IntervalLevelDetails intervalLevelDetails, PerformanceEntry performanceEntry) {
- // ColumnDefinition.ofInt("EntryId"),
- chunks[0].asWritableLongChunk().add(performanceEntry.getId());
+ int ci = 0;
+ // ColumnDefinition.ofLong("EntryId"),
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getId());
// ColumnDefinition.ofLong("EvaluationNumber"),
- chunks[1].asWritableLongChunk().add(performanceEntry.getEvaluationNumber());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getEvaluationNumber());
// ColumnDefinition.ofInt("OperationNumber"),
- chunks[2].asWritableIntChunk().add(performanceEntry.getOperationNumber());
+ chunks[ci++].asWritableIntChunk().add(performanceEntry.getOperationNumber());
// ColumnDefinition.ofString("EntryDescription"),
- chunks[3].asWritableObjectChunk().add(performanceEntry.getDescription());
+ chunks[ci++].asWritableObjectChunk().add(performanceEntry.getDescription());
// ColumnDefinition.ofString("EntryCallerLine"),
- chunks[4].asWritableObjectChunk().add(performanceEntry.getCallerLine());
+ chunks[ci++].asWritableObjectChunk().add(performanceEntry.getCallerLine());
// ColumnDefinition.ofTime("IntervalStartTime"),
- chunks[5].asWritableLongChunk().add(intervalLevelDetails.getIntervalStartTimeEpochNanos());
+ chunks[ci++].asWritableLongChunk().add(intervalLevelDetails.getIntervalStartTimeEpochNanos());
// ColumnDefinition.ofTime("IntervalEndTime"),
- chunks[6].asWritableLongChunk().add(intervalLevelDetails.getIntervalEndTimeEpochNanos());
+ chunks[ci++].asWritableLongChunk().add(intervalLevelDetails.getIntervalEndTimeEpochNanos());
// ColumnDefinition.ofLong("UsageNanos"),
- chunks[7].asWritableLongChunk().add(performanceEntry.getUsageNanos());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getUsageNanos());
// ColumnDefinition.ofLong("CpuNanos"),
- chunks[8].asWritableLongChunk().add(performanceEntry.getCpuNanos());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getCpuNanos());
// ColumnDefinition.ofLong("UserCpuNanos"),
- chunks[9].asWritableLongChunk().add(performanceEntry.getUserCpuNanos());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getUserCpuNanos());
// ColumnDefinition.ofLong("RowsAdded"),
- chunks[10].asWritableLongChunk().add(performanceEntry.getRowsAdded());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getRowsAdded());
// ColumnDefinition.ofLong("RowsRemoved"),
- chunks[11].asWritableLongChunk().add(performanceEntry.getRowsRemoved());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getRowsRemoved());
// ColumnDefinition.ofLong("RowsModified"),
- chunks[12].asWritableLongChunk().add(performanceEntry.getRowsModified());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getRowsModified());
// ColumnDefinition.ofLong("RowsShifted"),
- chunks[13].asWritableLongChunk().add(performanceEntry.getRowsShifted());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getRowsShifted());
// ColumnDefinition.ofLong("InvocationCount"),
- chunks[14].asWritableLongChunk().add(performanceEntry.getInvocationCount());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getInvocationCount());
// ColumnDefinition.ofLong("MinFreeMemory"),
- chunks[15].asWritableLongChunk().add(performanceEntry.getMinFreeMemory());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getMinFreeMemory());
// ColumnDefinition.ofLong("MaxTotalMemory"),
- chunks[16].asWritableLongChunk().add(performanceEntry.getMaxTotalMemory());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getMaxTotalMemory());
// ColumnDefinition.ofLong("Collections"),
- chunks[17].asWritableLongChunk().add(performanceEntry.getCollections());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getCollections());
// ColumnDefinition.ofLong("CollectionTimeNanos"),
- chunks[18].asWritableLongChunk().add(performanceEntry.getCollectionTimeNanos());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getCollectionTimeNanos());
// ColumnDefinition.ofLong("AllocatedBytes"),
- chunks[19].asWritableLongChunk().add(performanceEntry.getAllocatedBytes());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getAllocatedBytes());
// ColumnDefinition.ofLong("PoolAllocatedBytes"),
- chunks[20].asWritableLongChunk().add(performanceEntry.getPoolAllocatedBytes());
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getPoolAllocatedBytes());
+ // ColumnDefinition.ofLong("DataReadNanos"),
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getDataReadNanos());
+ // ColumnDefinition.ofLong("DataReadCount"),
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getDataReadCount());
+ // ColumnDefinition.ofLong("DataReadBytes"),
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getDataReadBytes());
+ // ColumnDefinition.ofLong("MetadataReadNanos"),
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getMetadataReadNanos());
+ // ColumnDefinition.ofLong("MetadataReadCount"),
+ chunks[ci++].asWritableLongChunk().add(performanceEntry.getMetadataReadCount());
// ColumnDefinition.ofString("AuthContext"),
- chunks[21].asWritableObjectChunk().add(Objects.toString(performanceEntry.getAuthContext()));
+ chunks[ci++].asWritableObjectChunk().add(Objects.toString(performanceEntry.getAuthContext()));
// ColumnDefinition.ofString("UpdateGraph"));
- chunks[22].asWritableObjectChunk().add(Objects.toString(performanceEntry.getUpdateGraphName()));
+ chunks[ci++].asWritableObjectChunk().add(Objects.toString(performanceEntry.getUpdateGraphName()));
// ColumnDefinition.ofLong("WorkerHeapSize")
- chunks[23].asWritableLongChunk().add(heapSize);
+ chunks[ci++].asWritableLongChunk().add(heapSize);
if (chunks[0].size() == CHUNK_SIZE) {
flushInternal();
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java
index 9b6acdd6e51..c121dc087df 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryOperationPerformanceStreamPublisher.java
@@ -43,6 +43,11 @@ class QueryOperationPerformanceStreamPublisher implements StreamPublisher {
ColumnDefinition.ofLong("AllocatedBytes"),
ColumnDefinition.ofLong("PoolAllocatedBytes"),
ColumnDefinition.ofLong("InputSizeLong"),
+ ColumnDefinition.ofLong("DataReadNanos"),
+ ColumnDefinition.ofLong("DataReadCount"),
+ ColumnDefinition.ofLong("DataReadBytes"),
+ ColumnDefinition.ofLong("MetadataReadNanos"),
+ ColumnDefinition.ofLong("MetadataReadCount"),
ColumnDefinition.ofBoolean("WasInterrupted"),
ColumnDefinition.ofString("AuthContext"),
ColumnDefinition.ofLong("WorkerHeapSize"));
@@ -70,84 +75,100 @@ public void register(@NotNull StreamConsumer consumer) {
}
public synchronized void add(final QueryPerformanceNugget nugget) {
+ int chunkIdx = 0;
// ColumnDefinition.ofLong("EvaluationNumber"),
- chunks[0].asWritableLongChunk().add(nugget.getEvaluationNumber());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getEvaluationNumber());
// ColumnDefinition.ofLong("ParentEvaluationNumber"),
- chunks[1].asWritableLongChunk().add(nugget.getParentEvaluationNumber());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getParentEvaluationNumber());
// ColumnDefinition.ofInt("OperationNumber"),
- chunks[2].asWritableIntChunk().add(nugget.getOperationNumber());
+ chunks[chunkIdx++].asWritableIntChunk().add(nugget.getOperationNumber());
// ColumnDefinition.ofInt("ParentOperationNumber"),
- chunks[3].asWritableIntChunk().add(nugget.getParentOperationNumber());
+ chunks[chunkIdx++].asWritableIntChunk().add(nugget.getParentOperationNumber());
// ColumnDefinition.ofInt("Depth"),
- chunks[4].asWritableIntChunk().add(nugget.getDepth());
+ chunks[chunkIdx++].asWritableIntChunk().add(nugget.getDepth());
// ColumnDefinition.ofString("SessionId"),
- chunks[5].asWritableObjectChunk().add(nugget.getSessionId());
+ chunks[chunkIdx++].asWritableObjectChunk().add(nugget.getSessionId());
// ColumnDefinition.ofString("Description"),
- chunks[6].asWritableObjectChunk().add(nugget.getDescription());
+ chunks[chunkIdx++].asWritableObjectChunk().add(nugget.getDescription());
// ColumnDefinition.ofString("CallerLine"),
- chunks[7].asWritableObjectChunk().add(nugget.getCallerLine());
+ chunks[chunkIdx++].asWritableObjectChunk().add(nugget.getCallerLine());
// ColumnDefinition.ofBoolean("IsCompilation"),
- chunks[8].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.isCompilation()));
+ chunks[chunkIdx++].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.isCompilation()));
// ColumnDefinition.ofTime("StartTime"),
- chunks[9].asWritableLongChunk().add(nugget.getStartClockEpochNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getStartClockEpochNanos());
// ColumnDefinition.ofTime("EndTime"),
- chunks[10].asWritableLongChunk().add(nugget.getEndClockEpochNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getEndClockEpochNanos());
// ColumnDefinition.ofLong("UsageNanos"),
- chunks[11].asWritableLongChunk().add(nugget.getUsageNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getUsageNanos());
// ColumnDefinition.ofLong("CpuNanos"),
- chunks[12].asWritableLongChunk().add(nugget.getCpuNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getCpuNanos());
// ColumnDefinition.ofLong("UserCpuNanos"),
- chunks[13].asWritableLongChunk().add(nugget.getUserCpuNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getUserCpuNanos());
// ColumnDefinition.ofLong("FreeMemory"),
- chunks[14].asWritableLongChunk().add(nugget.getEndFreeMemory());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getEndFreeMemory());
// ColumnDefinition.ofLong("TotalMemory"),
- chunks[15].asWritableLongChunk().add(nugget.getEndTotalMemory());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getEndTotalMemory());
// ColumnDefinition.ofLong("FreeMemoryChange"),
- chunks[16].asWritableLongChunk().add(nugget.getDiffFreeMemory());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDiffFreeMemory());
// ColumnDefinition.ofLong("TotalMemoryChange"),
- chunks[17].asWritableLongChunk().add(nugget.getDiffTotalMemory());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDiffTotalMemory());
// ColumnDefinition.ofLong("Collections")
- chunks[18].asWritableLongChunk().add(nugget.getDiffCollections());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDiffCollections());
// ColumnDefinition.ofLong("CollectionTimeNanos"),
- chunks[19].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos());
// ColumnDefinition.ofLong("AllocatedBytes"),
- chunks[20].asWritableLongChunk().add(nugget.getAllocatedBytes());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getAllocatedBytes());
// ColumnDefinition.ofLong("PoolAllocatedBytes"),
- chunks[21].asWritableLongChunk().add(nugget.getPoolAllocatedBytes());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getPoolAllocatedBytes());
// ColumnDefinition.ofLong("InputSizeLong"),
- chunks[22].asWritableLongChunk().add(nugget.getInputSize());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getInputSize());
+
+ // ColumnDefinition.ofLong("DataReadNanos"),
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDataReadNanos());
+
+ // ColumnDefinition.ofLong("DataReadCount"),
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDataReadCount());
+
+ // ColumnDefinition.ofLong("DataReadBytes"),
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDataReadBytes());
+
+ // ColumnDefinition.ofLong("MetadataReadNanos"),
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getMetadataReadNanos());
+
+ // ColumnDefinition.ofLong("MetadataReadCount"),
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getMetadataReadCount());
// ColumnDefinition.ofBoolean("WasInterrupted")
- chunks[23].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted()));
+ chunks[chunkIdx++].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted()));
// ColumnDefinition.ofString("AuthContext")
- chunks[24].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext()));
+ chunks[chunkIdx++].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext()));
// ColumnDefinition.ofLong("WorkerHeapSize")
- chunks[25].asWritableLongChunk().add(heapSize);
+ chunks[chunkIdx++].asWritableLongChunk().add(heapSize);
if (chunks[0].size() == CHUNK_SIZE) {
flushInternal();
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceStreamPublisher.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceStreamPublisher.java
index 6d6b49ce732..f09f076a282 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceStreamPublisher.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/util/QueryPerformanceStreamPublisher.java
@@ -38,6 +38,11 @@ class QueryPerformanceStreamPublisher implements StreamPublisher {
ColumnDefinition.ofLong("CollectionTimeNanos"),
ColumnDefinition.ofLong("AllocatedBytes"),
ColumnDefinition.ofLong("PoolAllocatedBytes"),
+ ColumnDefinition.ofLong("DataReadNanos"),
+ ColumnDefinition.ofLong("DataReadCount"),
+ ColumnDefinition.ofLong("DataReadBytes"),
+ ColumnDefinition.ofLong("MetadataReadNanos"),
+ ColumnDefinition.ofLong("MetadataReadCount"),
ColumnDefinition.ofBoolean("WasInterrupted"),
ColumnDefinition.ofString("Exception"),
ColumnDefinition.ofString("AuthContext"),
@@ -68,69 +73,85 @@ public void register(@NotNull StreamConsumer consumer) {
public synchronized void add(
@NotNull final QueryPerformanceNugget nugget,
@Nullable final Exception exception) {
+ int chunkIdx = 0;
// ColumnDefinition.ofLong("EvaluationNumber")
- chunks[0].asWritableLongChunk().add(nugget.getEvaluationNumber());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getEvaluationNumber());
// ColumnDefinition.ofLong("ParentEvaluationNumber")
- chunks[1].asWritableLongChunk().add(nugget.getParentEvaluationNumber());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getParentEvaluationNumber());
// ColumnDefinition.ofString("SessionId")
- chunks[2].asWritableObjectChunk().add(nugget.getSessionId());
+ chunks[chunkIdx++].asWritableObjectChunk().add(nugget.getSessionId());
// ColumnDefinition.ofString("Description")
- chunks[3].asWritableObjectChunk().add(nugget.getDescription());
+ chunks[chunkIdx++].asWritableObjectChunk().add(nugget.getDescription());
// ColumnDefinition.ofTime("StartTime");
- chunks[4].asWritableLongChunk().add(nugget.getStartClockEpochNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getStartClockEpochNanos());
// ColumnDefinition.ofTime("EndTime")
- chunks[5].asWritableLongChunk().add(nugget.getEndClockEpochNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getEndClockEpochNanos());
// ColumnDefinition.ofLong("UsageNanos")
- chunks[6].asWritableLongChunk().add(nugget.getUsageNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getUsageNanos());
// ColumnDefinition.ofLong("CpuNanos")
- chunks[7].asWritableLongChunk().add(nugget.getCpuNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getCpuNanos());
// ColumnDefinition.ofLong("UserCpuNanos")
- chunks[8].asWritableLongChunk().add(nugget.getUserCpuNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getUserCpuNanos());
// ColumnDefinition.ofLong("FreeMemory")
- chunks[9].asWritableLongChunk().add(nugget.getEndFreeMemory());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getEndFreeMemory());
// ColumnDefinition.ofLong("TotalMemory")
- chunks[10].asWritableLongChunk().add(nugget.getEndTotalMemory());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getEndTotalMemory());
// ColumnDefinition.ofLong("FreeMemoryChange")
- chunks[11].asWritableLongChunk().add(nugget.getDiffFreeMemory());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDiffFreeMemory());
// ColumnDefinition.ofLong("TotalMemoryChange")
- chunks[12].asWritableLongChunk().add(nugget.getDiffTotalMemory());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDiffTotalMemory());
// ColumnDefinition.ofLong("Collections")
- chunks[13].asWritableLongChunk().add(nugget.getDiffCollections());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDiffCollections());
// ColumnDefinition.ofLong("CollectionTimeNanos")
- chunks[14].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDiffCollectionTimeNanos());
// ColumnDefinition.ofLong("AllocatedBytes")
- chunks[15].asWritableLongChunk().add(nugget.getAllocatedBytes());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getAllocatedBytes());
// ColumnDefinition.ofLong("PoolAllocatedBytes")
- chunks[16].asWritableLongChunk().add(nugget.getPoolAllocatedBytes());
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getPoolAllocatedBytes());
+
+ // ColumnDefinition.ofLong("DataReadNanos")
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDataReadNanos());
+
+ // ColumnDefinition.ofLong("DataReadCount")
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDataReadCount());
+
+ // ColumnDefinition.ofLong("DataReadBytes")
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getDataReadBytes());
+
+ // ColumnDefinition.ofLong("MetadataReadNanos")
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getMetadataReadNanos());
+
+ // ColumnDefinition.ofLong("MetadataReadCount")
+ chunks[chunkIdx++].asWritableLongChunk().add(nugget.getMetadataReadCount());
// ColumnDefinition.ofBoolean("WasInterrupted")
- chunks[17].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted()));
+ chunks[chunkIdx++].asWritableByteChunk().add(BooleanUtils.booleanAsByte(nugget.wasInterrupted()));
// ColumnDefinition.ofString("Exception")
- chunks[18].asWritableObjectChunk().add(exception == null ? null : exception.getMessage());
+ chunks[chunkIdx++].asWritableObjectChunk().add(exception == null ? null : exception.getMessage());
// ColumnDefinition.ofString("AuthContext")
- chunks[19].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext()));
+ chunks[chunkIdx++].asWritableObjectChunk().add(Objects.toString(nugget.getAuthContext()));
// ColumnDefinition.ofLong("WorkerHeapSize")
- chunks[20].asWritableLongChunk().add(heapSize);
+ chunks[chunkIdx++].asWritableLongChunk().add(heapSize);
if (chunks[0].size() == CHUNK_SIZE) {
flushInternal();
diff --git a/engine/table/src/main/java/io/deephaven/engine/util/file/FileHandle.java b/engine/table/src/main/java/io/deephaven/engine/util/file/FileHandle.java
index dda17e8200e..8876c6e9155 100644
--- a/engine/table/src/main/java/io/deephaven/engine/util/file/FileHandle.java
+++ b/engine/table/src/main/java/io/deephaven/engine/util/file/FileHandle.java
@@ -8,6 +8,7 @@
import io.deephaven.base.stats.Value;
import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
+import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -198,7 +199,9 @@ public final long size() throws IOException {
try {
return fileChannel.size();
} finally {
- SIZE_DURATION_NANOS.sample(System.nanoTime() - startTimeNanos);
+ final long duration = System.nanoTime() - startTimeNanos;
+ SIZE_DURATION_NANOS.sample(duration);
+ QueryPerformanceRecorderState.recordMetadataOperation("size", duration);
}
} catch (ClosedChannelException e) {
postCloseProcedure.run();
@@ -268,13 +271,15 @@ public final FileHandle position(long newPosition) throws IOException {
public final int read(@NotNull final ByteBuffer destination, final long position) throws IOException {
try {
final long startTimeNanos = System.nanoTime();
- final int sizeBytes = destination.remaining();
- try {
- return fileChannel.read(destination, position);
- } finally {
- READ_DURATION_NANOS.sample(System.nanoTime() - startTimeNanos);
- READ_SIZE_BYTES.sample(sizeBytes);
+ final int readSize = fileChannel.read(destination, position);
+ if (readSize > 0) {
+ final long duration = System.nanoTime() - startTimeNanos;
+ QueryPerformanceRecorderState.recordRead(duration, readSize);
+ READ_DURATION_NANOS.sample(duration);
+ READ_SIZE_BYTES.sample(readSize);
}
+
+ return readSize;
} catch (ClosedChannelException e) {
postCloseProcedure.run();
throw e;
@@ -295,13 +300,14 @@ public final int read(@NotNull final ByteBuffer destination, final long position
public final int read(@NotNull final ByteBuffer destination) throws IOException {
try {
final long startTimeNanos = System.nanoTime();
- final int sizeBytes = destination.remaining();
- try {
- return fileChannel.read(destination);
- } finally {
- READ_DURATION_NANOS.sample(System.nanoTime() - startTimeNanos);
- READ_SIZE_BYTES.sample(sizeBytes);
+ final int readSize = fileChannel.read(destination);
+ if (readSize > 0) {
+ final long duration = System.nanoTime() - startTimeNanos;
+ QueryPerformanceRecorderState.recordRead(duration, readSize);
+ READ_DURATION_NANOS.sample(duration);
+ READ_SIZE_BYTES.sample(readSize);
}
+ return readSize;
} catch (ClosedChannelException e) {
postCloseProcedure.run();
throw e;
diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java
index b33f877dcc0..c7de3b3bbd2 100644
--- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java
+++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTools.java
@@ -19,6 +19,7 @@
import io.deephaven.engine.table.impl.TableUpdateMode;
import io.deephaven.engine.table.impl.locations.util.PartitionFormatter;
import io.deephaven.engine.table.impl.locations.util.TableDataRefreshService;
+import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorder;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.parquet.base.ParquetMetadataFileWriter;
import io.deephaven.parquet.base.NullParquetMetadataFileWriter;
@@ -127,29 +128,31 @@ public static Table readTable(@NotNull final String source) {
public static Table readTable(
@NotNull final String source,
@NotNull final ParquetInstructions readInstructions) {
- final boolean isParquetFile = ParquetUtils.isParquetFile(source);
- final boolean isMetadataFile = !isParquetFile && ParquetUtils.isMetadataFile(source);
- final boolean isDirectory = !isParquetFile && !isMetadataFile;
- final URI sourceURI = convertToURI(source, isDirectory);
- if (readInstructions.getFileLayout().isPresent()) {
- switch (readInstructions.getFileLayout().get()) {
- case SINGLE_FILE:
- return readSingleFileTable(sourceURI, readInstructions);
- case FLAT_PARTITIONED:
- return readFlatPartitionedTable(sourceURI, readInstructions);
- case KV_PARTITIONED:
- return readKeyValuePartitionedTable(sourceURI, readInstructions, null);
- case METADATA_PARTITIONED:
- return readPartitionedTableWithMetadata(sourceURI, readInstructions, null);
+ return QueryPerformanceRecorder.withNugget("ParquetTools.readTable(" + source + ")", () -> {
+ final boolean isParquetFile = ParquetUtils.isParquetFile(source);
+ final boolean isMetadataFile = !isParquetFile && ParquetUtils.isMetadataFile(source);
+ final boolean isDirectory = !isParquetFile && !isMetadataFile;
+ final URI sourceURI = convertToURI(source, isDirectory);
+ if (readInstructions.getFileLayout().isPresent()) {
+ switch (readInstructions.getFileLayout().get()) {
+ case SINGLE_FILE:
+ return readSingleFileTable(sourceURI, readInstructions);
+ case FLAT_PARTITIONED:
+ return readFlatPartitionedTable(sourceURI, readInstructions);
+ case KV_PARTITIONED:
+ return readKeyValuePartitionedTable(sourceURI, readInstructions, null);
+ case METADATA_PARTITIONED:
+ return readPartitionedTableWithMetadata(sourceURI, readInstructions, null);
+ }
}
- }
- if (isParquetFile) {
- return readSingleFileTable(sourceURI, readInstructions);
- }
- if (isMetadataFile) {
- return readPartitionedTableWithMetadata(sourceURI, readInstructions, null);
- }
- return readPartitionedTableDirectory(sourceURI, readInstructions);
+ if (isParquetFile) {
+ return readSingleFileTable(sourceURI, readInstructions);
+ }
+ if (isMetadataFile) {
+ return readPartitionedTableWithMetadata(sourceURI, readInstructions, null);
+ }
+ return readPartitionedTableDirectory(sourceURI, readInstructions);
+ });
}
/**
diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java
index 7b084709f79..ccc9fc54e33 100644
--- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java
+++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetFlatPartitionedLayout.java
@@ -5,6 +5,7 @@
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
+import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState;
import io.deephaven.parquet.base.ParquetUtils;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
@@ -60,6 +61,7 @@ public void findKeys(@NotNull final Consumer locationKe
} else {
uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION);
}
+ final long start = System.nanoTime();
try (final Stream stream = channelsProvider.list(tableRootDirectory)) {
stream.filter(uriFilter).forEach(uri -> {
cache.compute(uri, (key, existingLocationKey) -> {
@@ -74,6 +76,8 @@ public void findKeys(@NotNull final Consumer locationKe
});
} catch (final IOException e) {
throw new TableDataException("Error finding parquet locations under " + tableRootDirectory, e);
+ } finally {
+ QueryPerformanceRecorderState.recordMetadataOperation("list", System.nanoTime() - start);
}
}
diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java
index 6ab3a18afe5..8ffdb8b7483 100644
--- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java
+++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/layout/ParquetKeyValuePartitionedLayout.java
@@ -12,6 +12,7 @@
import io.deephaven.engine.table.impl.locations.local.LocationTableBuilderDefinition;
import io.deephaven.engine.table.impl.locations.local.URIStreamKeyValuePartitionLayout;
import io.deephaven.engine.table.impl.locations.local.KeyValuePartitionLayout;
+import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState;
import io.deephaven.parquet.base.ParquetUtils;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.location.ParquetTableLocationKey;
@@ -127,10 +128,13 @@ public final void findKeys(@NotNull final Consumer loca
} else {
uriFilter = uri -> uri.getPath().endsWith(ParquetUtils.PARQUET_FILE_EXTENSION);
}
+ final long start = System.nanoTime();
try (final Stream filteredUriStream = channelsProvider.walk(tableRootDirectory).filter(uriFilter)) {
findKeys(filteredUriStream, locationKeyObserver);
} catch (final IOException e) {
throw new TableDataException("Error finding parquet locations under " + tableRootDirectory, e);
+ } finally {
+ QueryPerformanceRecorderState.recordMetadataOperation("walk", System.nanoTime() - start);
}
}
}
diff --git a/extensions/s3/build.gradle b/extensions/s3/build.gradle
index a149d6d24bb..461dbee6fb9 100644
--- a/extensions/s3/build.gradle
+++ b/extensions/s3/build.gradle
@@ -16,6 +16,7 @@ dependencies {
implementation project(':Configuration')
implementation project(':log-factory')
implementation project(':util-thread')
+ implementation project(':engine-table')
implementation platform(libs.awssdk.bom)
implementation libs.awssdk.s3
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ReadContext.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ReadContext.java
index 87fbc6c5770..5778e056be6 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ReadContext.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ReadContext.java
@@ -3,6 +3,7 @@
//
package io.deephaven.extensions.s3;
+import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.channel.SeekableChannelContext;
@@ -112,6 +113,7 @@ int fill(final long position, final ByteBuffer dest) throws IOException {
final long totalRemainingFragments = numFragments - firstFragmentIx - 1;
readAhead = Math.min(Math.max(impliedReadAhead, desiredReadAhead), totalRemainingFragments);
}
+ final long startNanos = System.nanoTime();
int filled;
{
// Hold a reference to the first request to ensure it is not evicted from the cache
@@ -131,6 +133,10 @@ int fill(final long position, final ByteBuffer dest) throws IOException {
// non-blocking since we know isDone
filled += readAheadRequest.fill(position + filled, dest);
}
+ // We only record the time we spent on blocking reads. We record the bytes that we've read either in the first
+ // fragment or in the non-blocking read-ahead fragments.
+ final long durationNanos = System.nanoTime() - startNanos;
+ QueryPerformanceRecorderState.recordRead(durationNanos, filled);
return filled;
}
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ReadRequest.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ReadRequest.java
index 51fbd6d191c..e6808a6123a 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ReadRequest.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ReadRequest.java
@@ -4,6 +4,9 @@
package io.deephaven.extensions.s3;
import io.deephaven.base.reference.CleanupReference;
+import io.deephaven.base.stats.State;
+import io.deephaven.base.stats.Stats;
+import io.deephaven.base.stats.Value;
import io.deephaven.base.verify.Require;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
@@ -45,6 +48,11 @@ final class S3ReadRequest extends SoftReference
implements AsyncResponseTransformer, BiConsumer,
CleanupReference {
+ private static final Value READ_DURATION_NANOS =
+ Stats.makeItem("S3ReadRequest", "readDurationNanos", State.FACTORY).getValue();
+ private static final Value READ_SIZE_BYTES =
+ Stats.makeItem("S3ReadRequest", "readSizeBytes", State.FACTORY).getValue();
+
/**
* A unique identifier for a request, consisting of the URI and fragment index.
*/
@@ -89,6 +97,10 @@ public boolean equals(Object obj) {
private final Instant createdAt;
private volatile CompletableFuture consumerFuture;
private volatile CompletableFuture producerFuture;
+ /**
+ * The System.nanoTime at which we sent this request.
+ */
+ private volatile long startNanos;
private int fillCount;
private long fillBytes;
private final S3ReadRequestCache sharedCache;
@@ -166,6 +178,7 @@ private void sendImpl() {
if (log.isDebugEnabled()) {
log.debug().append("Sending: ").append(requestStr()).endl();
}
+ startNanos = System.nanoTime();
final CompletableFuture ret = client.getObject(getObjectRequest(), this);
ret.whenComplete(this);
consumerFuture = ret;
@@ -271,6 +284,15 @@ public void cleanup() {
@Override
public void accept(final Boolean isComplete, final Throwable throwable) {
+ // we record the endNanos, but do not sample into the QueryPerformanceRecorderState as this method is dispatched
+ // asynchronously and we need to account for the read on the thread that is waiting for it. This in part means
+ // that any read-ahead performed but not waited for is not accounted for in our read statistics for the query
+ // or update. We do however include all of the requests in our statistics objects.
+ final long durationNanos = System.nanoTime() - startNanos;
+ READ_DURATION_NANOS.sample(durationNanos);
+ READ_SIZE_BYTES.sample(requestLength());
+ // We could discriminate between isComplete and not-complete; but it seems reasonable to count both
+ // successful and not successful reads into our statistics.
if (log.isDebugEnabled()) {
final Instant completedAt = Instant.now();
if (Boolean.TRUE.equals(isComplete)) {
diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java
index 7f6fd67009f..d8f0f402436 100644
--- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java
+++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java
@@ -6,6 +6,7 @@
import io.deephaven.UncheckedDeephavenException;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
+import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState;
import io.deephaven.hash.KeyedObjectHashMap;
import io.deephaven.hash.KeyedObjectKey;
import io.deephaven.internal.log.LoggerFactory;
@@ -327,12 +328,16 @@ long fetchFileSize(@NotNull final S3Uri s3Uri) throws IOException {
.key(s3Uri.key().orElseThrow());
final Duration readTimeout = s3Instructions.readTimeout();
requestBuilder.overrideConfiguration(b -> addTimeout(b, readTimeout));
+ final long start = System.nanoTime();
final CompletableFuture responseFuture = s3AsyncClient.headObject(requestBuilder.build());
try {
headObjectResponse = responseFuture.get(readTimeout.toNanos(), TimeUnit.NANOSECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) {
responseFuture.cancel(true);
throw handleS3Exception(e, String.format("fetching HEAD for file %s", s3Uri), s3Instructions);
+ } finally {
+ final long duration = System.nanoTime() - start;
+ QueryPerformanceRecorderState.recordMetadataOperation("size", duration);
}
final long fileSize = headObjectResponse.contentLength();
updateFileSizeCache(s3Uri.uri(), fileSize);
diff --git a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java
index 91e399c46ce..3b3100e6a8c 100644
--- a/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java
+++ b/extensions/s3/src/test/java/io/deephaven/extensions/s3/S3SeekableChannelSimpleTestBase.java
@@ -3,6 +3,7 @@
//
package io.deephaven.extensions.s3;
+import io.deephaven.engine.table.impl.perf.QueryPerformanceRecorderState;
import io.deephaven.extensions.s3.testlib.S3SeekableChannelTestSetup;
import io.deephaven.util.channel.CachedChannelProvider;
import io.deephaven.util.channel.CompletableOutputStream;
@@ -86,6 +87,9 @@ public int read() {
}, (long) numBytes, executor));
final URI uri = uri("32MiB.bin");
final ByteBuffer buffer = ByteBuffer.allocate(1);
+ final long startTime = System.nanoTime();
+ final long startReadBytes = QueryPerformanceRecorderState.getReadTrackerForCurrentThread().getDataReadBytes();
+ final long startReadNanos = QueryPerformanceRecorderState.getReadTrackerForCurrentThread().getDataReadNanos();
try (
final SeekableChannelsProvider providerImpl = providerImpl();
final SeekableChannelsProvider provider = CachedChannelProvider.create(providerImpl, 32);
@@ -98,6 +102,15 @@ public int read() {
}
assertThat(readChannel.read(buffer)).isEqualTo(-1);
}
+ final long endTime = System.nanoTime();
+ final long endReadBytes = QueryPerformanceRecorderState.getReadTrackerForCurrentThread().getDataReadBytes();
+ final long endReadNanos = QueryPerformanceRecorderState.getReadTrackerForCurrentThread().getDataReadNanos();
+ assertThat(endReadBytes - startReadBytes).isEqualTo(numBytes);
+ final long duration = endReadNanos - startReadNanos;
+ // we need to record some time
+ assertThat(duration).isGreaterThan(0);
+ // but don't want to double count
+ assertThat(duration).isLessThan(endTime - startTime);
}
@Test