@@ -105,6 +105,156 @@ export async function getLogsBatch(project, runs) {
105105 return await callApi ( "/get_logs_batch" , payload ) ;
106106}
107107
108+ function sqlString ( value ) {
109+ return `'${ String ( value ?? "" ) . replaceAll ( "'" , "''" ) } '` ;
110+ }
111+
112+ const DEFAULT_METRIC_MAX_POINTS = 1500 ;
113+
114+ function scalarLogsFromQueryRows ( rows ) {
115+ return ( rows || [ ] ) . map ( ( row ) => {
116+ const metrics = ( ( ) => {
117+ try {
118+ return JSON . parse ( row . metrics || "{}" ) ;
119+ } catch {
120+ return { } ;
121+ }
122+ } ) ( ) ;
123+ return {
124+ ...metrics ,
125+ timestamp : row . timestamp ,
126+ step : row . step ,
127+ } ;
128+ } ) ;
129+ }
130+
131+ function scalarOnlyLogs ( logs ) {
132+ return ( logs || [ ] ) . map ( ( row ) => {
133+ const out = {
134+ timestamp : row . timestamp ,
135+ step : row . step ,
136+ } ;
137+ for ( const [ key , value ] of Object . entries ( row ) ) {
138+ if ( key === "timestamp" || key === "step" ) continue ;
139+ if ( typeof value === "number" && Number . isFinite ( value ) ) out [ key ] = value ;
140+ }
141+ return out ;
142+ } ) ;
143+ }
144+
145+ async function queryProject ( project , query ) {
146+ return await callApi ( "/query_project" , { project, query } ) ;
147+ }
148+
149+ export async function getTraceStepCounts ( project , run ) {
150+ if ( await isStaticMode ( ) ) return [ ] ;
151+
152+ const normalized = normalizeRun ( run ) ;
153+ const where = normalized . run_id
154+ ? `m.run_id = ${ sqlString ( normalized . run_id ) } `
155+ : `m.run_name = ${ sqlString ( normalized . run ) } ` ;
156+ const result = await queryProject (
157+ project ,
158+ `
159+ WITH trace_metrics AS (
160+ SELECT
161+ m.step AS step,
162+ j.type AS value_type,
163+ j.value AS value
164+ FROM metrics m, json_each(CAST(m.metrics AS TEXT)) j
165+ WHERE ${ where }
166+ AND m.step IS NOT NULL
167+ AND j.type IN ('array', 'object')
168+ )
169+ SELECT
170+ step,
171+ SUM(
172+ CASE
173+ WHEN value_type = 'array' THEN json_array_length(value)
174+ ELSE 1
175+ END
176+ ) AS trace_count
177+ FROM trace_metrics
178+ WHERE (
179+ value_type = 'array'
180+ AND (
181+ json_extract(value, '$[0]._type') = 'trackio.trace'
182+ OR json_extract(value, '$[0].messages') IS NOT NULL
183+ )
184+ )
185+ OR (
186+ value_type = 'object'
187+ AND (
188+ json_extract(value, '$._type') = 'trackio.trace'
189+ OR json_extract(value, '$.messages') IS NOT NULL
190+ )
191+ )
192+ GROUP BY step
193+ ORDER BY step
194+ ` ,
195+ ) ;
196+ return ( result . rows || [ ] ) . map ( ( row ) => ( {
197+ step : Number ( row . step ) ,
198+ count : Number ( row . trace_count || 0 ) ,
199+ } ) ) . filter ( ( row ) => Number . isFinite ( row . step ) && row . count > 0 ) ;
200+ }
201+
202+ export async function getScalarLogsBatch ( project , runs ) {
203+ if ( await isStaticMode ( ) ) {
204+ const out = [ ] ;
205+ for ( const run of runs ) {
206+ const logs = await staticApi . getLogs ( project , run ) ;
207+ out . push ( { ...normalizeRun ( run ) , logs : scalarOnlyLogs ( logs ) } ) ;
208+ }
209+ return out ;
210+ }
211+
212+ const out = [ ] ;
213+ for ( const run of runs ) {
214+ const normalized = normalizeRun ( run ) ;
215+ const where = normalized . run_id
216+ ? `m.run_id = ${ sqlString ( normalized . run_id ) } `
217+ : `m.run_name = ${ sqlString ( normalized . run ) } ` ;
218+ const result = await queryProject (
219+ project ,
220+ `
221+ WITH scalar_rows AS (
222+ SELECT
223+ m.timestamp,
224+ m.step,
225+ json_group_object(j.key, j.value) AS metrics
226+ FROM metrics m, json_each(CAST(m.metrics AS TEXT)) j
227+ WHERE ${ where }
228+ AND j.type IN ('integer', 'real')
229+ GROUP BY m.id, m.timestamp, m.step
230+ ),
231+ numbered AS (
232+ SELECT
233+ timestamp,
234+ step,
235+ metrics,
236+ ROW_NUMBER() OVER (ORDER BY timestamp) - 1 AS row_index,
237+ COUNT(*) OVER () AS row_count
238+ FROM scalar_rows
239+ )
240+ SELECT timestamp, step, metrics
241+ FROM numbered
242+ WHERE row_count <= ${ DEFAULT_METRIC_MAX_POINTS }
243+ OR row_index = 0
244+ OR row_index = row_count - 1
245+ OR CAST(row_index * ${ DEFAULT_METRIC_MAX_POINTS } / row_count AS INTEGER)
246+ != CAST((row_index - 1) * ${ DEFAULT_METRIC_MAX_POINTS } / row_count AS INTEGER)
247+ ORDER BY timestamp
248+ ` ,
249+ ) ;
250+ out . push ( {
251+ ...normalized ,
252+ logs : scalarLogsFromQueryRows ( result . rows ) ,
253+ } ) ;
254+ }
255+ return out ;
256+ }
257+
108258export async function getTraces ( project , run , options = { } ) {
109259 const params = { project, ...normalizeRun ( run ) , ...options } ;
110260 if ( await isStaticMode ( ) ) return staticApi . getTraces ( project , run , options ) ;
0 commit comments