@@ -16,6 +16,7 @@ package external
1616
1717import (
1818 "context"
19+ "encoding/hex"
1920 "flag"
2021 "fmt"
2122 "io"
@@ -27,6 +28,7 @@ import (
2728
2829 "github.com/docker/go-units"
2930 "github.com/felixge/fgprof"
31+ "github.com/pingcap/tidb/br/pkg/membuf"
3032 "github.com/pingcap/tidb/br/pkg/storage"
3133 "github.com/pingcap/tidb/pkg/kv"
3234 "github.com/pingcap/tidb/pkg/util/intest"
@@ -43,11 +45,17 @@ type writeTestSuite struct {
4345 memoryLimit int
4446 beforeCreateWriter func ()
4547 afterWriterClose func ()
48+
49+ optionalFilePath string
50+ onClose OnCloseFunc
4651}
4752
4853func writePlainFile (s * writeTestSuite ) {
4954 ctx := context .Background ()
5055 filePath := "/test/writer"
56+ if s .optionalFilePath != "" {
57+ filePath = s .optionalFilePath
58+ }
5159 _ = s .store .DeleteFile (ctx , filePath )
5260 buf := make ([]byte , s .memoryLimit )
5361 offset := 0
@@ -92,9 +100,13 @@ func cleanOldFiles(ctx context.Context, store storage.ExternalStorage, subDir st
92100func writeExternalFile (s * writeTestSuite ) {
93101 ctx := context .Background ()
94102 filePath := "/test/writer"
103+ if s .optionalFilePath != "" {
104+ filePath = s .optionalFilePath
105+ }
95106 cleanOldFiles (ctx , s .store , filePath )
96107 builder := NewWriterBuilder ().
97- SetMemorySizeLimit (uint64 (s .memoryLimit ))
108+ SetMemorySizeLimit (uint64 (s .memoryLimit )).
109+ SetOnCloseFunc (s .onClose )
98110
99111 if s .beforeCreateWriter != nil {
100112 s .beforeCreateWriter ()
@@ -116,6 +128,9 @@ func writeExternalFile(s *writeTestSuite) {
116128func writeExternalOneFile (s * writeTestSuite ) {
117129 ctx := context .Background ()
118130 filePath := "/test/writer"
131+ if s .optionalFilePath != "" {
132+ filePath = s .optionalFilePath
133+ }
119134 cleanOldFiles (ctx , s .store , filePath )
120135 builder := NewWriterBuilder ().
121136 SetMemorySizeLimit (uint64 (s .memoryLimit ))
@@ -126,13 +141,21 @@ func writeExternalOneFile(s *writeTestSuite) {
126141 writer := builder .BuildOneFile (
127142 s .store , filePath , "writerID" )
128143 intest .AssertNoError (writer .Init (ctx , 20 * 1024 * 1024 ))
144+ var minKey , maxKey []byte
145+
129146 key , val , _ := s .source .next ()
147+ minKey = key
130148 for key != nil {
149+ maxKey = key
131150 err := writer .WriteRow (ctx , key , val )
132151 intest .AssertNoError (err )
133152 key , val , _ = s .source .next ()
134153 }
135154 intest .AssertNoError (writer .Close (ctx ))
155+ s .onClose (& WriterSummary {
156+ Min : minKey ,
157+ Max : maxKey ,
158+ })
136159 if s .afterWriterClose != nil {
137160 s .afterWriterClose ()
138161 }
@@ -674,3 +697,55 @@ func TestMergeBench(t *testing.T) {
674697 testCompareMergeWithContent (t , 8 , createAscendingFiles , newMergeStep )
675698 testCompareMergeWithContent (t , 8 , createEvenlyDistributedFiles , newMergeStep )
676699}
700+
701+ func TestReadAllDataLargeFiles (t * testing.T ) {
702+ ctx := context .Background ()
703+ store := openTestingStorage (t )
704+
705+ // ~ 100B * 20M = 2GB
706+ source := newAscendingKeyAsyncSource (20 * 1024 * 1024 , 10 , 90 , nil )
707+ // ~ 1KB * 2M = 2GB
708+ source2 := newAscendingKeyAsyncSource (2 * 1024 * 1024 , 10 , 990 , nil )
709+ var minKey , maxKey kv.Key
710+ recordMinMax := func (s * WriterSummary ) {
711+ minKey = s .Min
712+ maxKey = s .Max
713+ }
714+ suite := & writeTestSuite {
715+ store : store ,
716+ source : source ,
717+ memoryLimit : 256 * 1024 * 1024 ,
718+ optionalFilePath : "/test/file" ,
719+ onClose : recordMinMax ,
720+ }
721+ suite2 := & writeTestSuite {
722+ store : store ,
723+ source : source2 ,
724+ memoryLimit : 256 * 1024 * 1024 ,
725+ optionalFilePath : "/test/file2" ,
726+ onClose : recordMinMax ,
727+ }
728+ writeExternalOneFile (suite )
729+ t .Logf ("minKey: %s, maxKey: %s" , minKey , maxKey )
730+ writeExternalOneFile (suite2 )
731+ t .Logf ("minKey: %s, maxKey: %s" , minKey , maxKey )
732+
733+ dataFiles , statFiles , err := GetAllFileNames (ctx , store , "" )
734+ intest .AssertNoError (err )
735+ intest .Assert (len (dataFiles ) == 2 )
736+
737+ // choose the two keys so that expected concurrency is 579 and 19
738+ startKey , err := hex .DecodeString ("00000001000000000000" )
739+ intest .AssertNoError (err )
740+ endKey , err := hex .DecodeString ("00a00000000000000000" )
741+ intest .AssertNoError (err )
742+ bufPool := membuf .NewPool (
743+ membuf .WithBlockNum (0 ),
744+ membuf .WithBlockSize (ConcurrentReaderBufferSizePerConc ),
745+ )
746+ output := & memKVsAndBuffers {}
747+ now := time .Now ()
748+ err = readAllData (ctx , store , dataFiles , statFiles , startKey , endKey , bufPool , output )
749+ t .Logf ("read all data cost: %s" , time .Since (now ))
750+ intest .AssertNoError (err )
751+ }
0 commit comments