Skip to content

Commit f62665b

Browse files
authored
Merge pull request #15 from zanmato1984/tiforth
*: merge tiforth to master for Gate D blocker pingcap#349
2 parents 54392f5 + dc14c7e commit f62665b

File tree

8 files changed

+1258
-1
lines changed

8 files changed

+1258
-1
lines changed

pkg/executor/test/jointest/hashjoin/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ go_test(
99
],
1010
flaky = True,
1111
race = "on",
12-
shard_count = 21,
12+
shard_count = 23,
1313
deps = [
1414
"//pkg/config",
1515
"//pkg/executor/join",
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build tiforth_adapter && cgo
16+
17+
package hashjoin
18+
19+
import (
20+
"fmt"
21+
"sync"
22+
"testing"
23+
24+
"github.com/pingcap/tidb/pkg/testkit"
25+
"github.com/pingcap/tidb/pkg/tiforthadapter"
26+
"github.com/stretchr/testify/require"
27+
)
28+
29+
func TestTiForthHostV2CastDonorNativeEntryParitySerialAndParallel(t *testing.T) {
30+
wantRows, wantWarnings := runDonorNativeCastBaseline(t)
31+
32+
t.Run("serial", func(t *testing.T) {
33+
borrowRows, borrowWarnings, err := tiforthadapter.RunHostV2TruncateAsWarningDecimalCast(false)
34+
require.NoError(t, err)
35+
require.Equal(t, wantWarnings, borrowWarnings)
36+
require.Equal(t, wantRows, borrowRows)
37+
38+
foreignRows, foreignWarnings, err := tiforthadapter.RunHostV2TruncateAsWarningDecimalCast(true)
39+
require.NoError(t, err)
40+
require.Equal(t, wantWarnings, foreignWarnings)
41+
require.Equal(t, wantRows, foreignRows)
42+
})
43+
44+
t.Run("parallel", func(t *testing.T) {
45+
runParallelParity(
46+
t,
47+
4,
48+
func(worker int) ([]string, uint32, error) {
49+
foreignRetainable := worker%2 == 1
50+
return tiforthadapter.RunHostV2TruncateAsWarningDecimalCast(foreignRetainable)
51+
},
52+
wantRows,
53+
wantWarnings,
54+
)
55+
})
56+
}
57+
58+
func TestTiForthHostV2InnerHashJoinDonorNativeEntryParitySerialAndParallel(t *testing.T) {
59+
wantRows, wantWarnings := runDonorNativeInnerHashJoinBaseline(t)
60+
61+
t.Run("serial", func(t *testing.T) {
62+
borrowRows, borrowWarnings, err := tiforthadapter.RunHostV2InnerHashJoinPayloadRows(1, false)
63+
require.NoError(t, err)
64+
require.Equal(t, wantWarnings, borrowWarnings)
65+
require.Equal(t, wantRows, borrowRows)
66+
67+
foreignRows, foreignWarnings, err := tiforthadapter.RunHostV2InnerHashJoinPayloadRows(1, true)
68+
require.NoError(t, err)
69+
require.Equal(t, wantWarnings, foreignWarnings)
70+
require.Equal(t, wantRows, foreignRows)
71+
72+
highPartitionRows, highPartitionWarnings, err := tiforthadapter.RunHostV2InnerHashJoinPayloadRows(8, true)
73+
require.NoError(t, err)
74+
require.Equal(t, wantWarnings, highPartitionWarnings)
75+
require.Equal(t, wantRows, highPartitionRows)
76+
})
77+
78+
t.Run("parallel", func(t *testing.T) {
79+
runParallelParity(
80+
t,
81+
4,
82+
func(worker int) ([]string, uint32, error) {
83+
foreignRetainable := worker%2 == 1
84+
partitions := 2
85+
if foreignRetainable {
86+
partitions = 8
87+
}
88+
return tiforthadapter.RunHostV2InnerHashJoinPayloadRows(partitions, foreignRetainable)
89+
},
90+
wantRows,
91+
wantWarnings,
92+
)
93+
})
94+
}
95+
96+
func runParallelParity(
97+
t *testing.T,
98+
workers int,
99+
run func(worker int) ([]string, uint32, error),
100+
wantRows []string,
101+
wantWarnings uint32,
102+
) {
103+
t.Helper()
104+
105+
errCh := make(chan error, workers)
106+
var wg sync.WaitGroup
107+
108+
for worker := 0; worker < workers; worker++ {
109+
wg.Add(1)
110+
go func(worker int) {
111+
defer wg.Done()
112+
113+
rows, warnings, err := run(worker)
114+
if err != nil {
115+
errCh <- fmt.Errorf("worker %d run failed: %w", worker, err)
116+
return
117+
}
118+
if warnings != wantWarnings {
119+
errCh <- fmt.Errorf(
120+
"worker %d warning mismatch: got=%d want=%d",
121+
worker,
122+
warnings,
123+
wantWarnings,
124+
)
125+
return
126+
}
127+
if len(rows) != len(wantRows) {
128+
errCh <- fmt.Errorf(
129+
"worker %d row count mismatch: got=%d want=%d",
130+
worker,
131+
len(rows),
132+
len(wantRows),
133+
)
134+
return
135+
}
136+
for idx := range rows {
137+
if rows[idx] != wantRows[idx] {
138+
errCh <- fmt.Errorf(
139+
"worker %d row %d mismatch: got=%q want=%q",
140+
worker,
141+
idx,
142+
rows[idx],
143+
wantRows[idx],
144+
)
145+
return
146+
}
147+
}
148+
}(worker)
149+
}
150+
151+
wg.Wait()
152+
close(errCh)
153+
for err := range errCh {
154+
t.Fatal(err)
155+
}
156+
}
157+
158+
func runDonorNativeCastBaseline(t *testing.T) ([]string, uint32) {
159+
t.Helper()
160+
161+
store := testkit.CreateMockStore(t)
162+
tk := testkit.NewTestKit(t, store)
163+
tk.MustExec("use test")
164+
tk.MustExec("set @@sql_mode=''")
165+
166+
rows := tk.MustQuery(`
167+
select cast(v as decimal(10,2))
168+
from (
169+
select 1 as id, '1.239' as v
170+
union all select 2, '5.20'
171+
union all select 3, null
172+
) as src
173+
order by id
174+
`).Rows()
175+
176+
warnings := uint32(len(tk.MustQuery("show warnings").Rows()))
177+
require.Greater(t, warnings, uint32(0))
178+
return formatQueryRows(rows), warnings
179+
}
180+
181+
func runDonorNativeInnerHashJoinBaseline(t *testing.T) ([]string, uint32) {
182+
t.Helper()
183+
184+
store := testkit.CreateMockStore(t)
185+
tk := testkit.NewTestKit(t, store)
186+
tk.MustExec("use test")
187+
188+
rows := tk.MustQuery(`
189+
select b.payload, p.payload
190+
from (
191+
select 'k' as join_key, 10 as payload
192+
union all select 'k', 20
193+
union all select 'x', 30
194+
union all select null, 40
195+
) as b
196+
join (
197+
select 'k' as join_key, 100 as payload
198+
union all select 'x', 200
199+
union all select 'z', 300
200+
union all select null, 400
201+
) as p
202+
on b.join_key = p.join_key
203+
order by b.payload, p.payload
204+
`).Rows()
205+
206+
warnings := uint32(len(tk.MustQuery("show warnings").Rows()))
207+
return formatQueryRows(rows), warnings
208+
}
209+
210+
func formatQueryRows(rows [][]any) []string {
211+
formatted := make([]string, len(rows))
212+
for i, row := range rows {
213+
if len(row) == 0 {
214+
formatted[i] = ""
215+
continue
216+
}
217+
218+
text := normalizeQueryValue(row[0])
219+
for _, col := range row[1:] {
220+
text += " " + normalizeQueryValue(col)
221+
}
222+
formatted[i] = text
223+
}
224+
return formatted
225+
}
226+
227+
func normalizeQueryValue(value any) string {
228+
if value == nil {
229+
return "null"
230+
}
231+
switch v := value.(type) {
232+
case []byte:
233+
if string(v) == "<nil>" {
234+
return "null"
235+
}
236+
return string(v)
237+
case string:
238+
if v == "<nil>" {
239+
return "null"
240+
}
241+
return v
242+
default:
243+
return fmt.Sprint(v)
244+
}
245+
}

pkg/tiforthadapter/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library")
2+
3+
go_library(
4+
name = "tiforthadapter",
5+
srcs = ["doc.go"],
6+
cgo = True,
7+
importpath = "github.com/pingcap/tidb/pkg/tiforthadapter",
8+
visibility = ["//visibility:public"],
9+
)

pkg/tiforthadapter/doc.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// Package tiforthadapter contains donor-side reusable adapter helpers for
16+
// TiForth host-v2 proving slices. These helpers are enabled only when the
17+
// `tiforth_adapter` build tag is provided.
18+
package tiforthadapter

0 commit comments

Comments
 (0)