Skip to content

Commit 91eb9de

Browse files
committed
Use existing TSMReader from file store during compactions
Compactions would create their own TSMReaders for simplicity. With very high cardinality compactions, creating the reader and indirectIndex can start to use a significant amount of memory. This changes the compactions to use a reader that is already allocated and managed by the FileStore.
1 parent 739ecd2 commit 91eb9de

3 files changed

Lines changed: 73 additions & 22 deletions

File tree

tsdb/engine/tsm1/compact.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ var (
4141
errMaxFileExceeded = fmt.Errorf("max file exceeded")
4242
errSnapshotsDisabled = fmt.Errorf("snapshots disabled")
4343
errCompactionsDisabled = fmt.Errorf("compactions disabled")
44-
errCompactionAborted = fmt.Errorf("compaction aborted")
4544
)
4645

4746
type errCompactionInProgress struct {
@@ -56,6 +55,17 @@ func (e errCompactionInProgress) Error() string {
5655
return "compaction in progress"
5756
}
5857

58+
type errCompactionAborted struct {
59+
err error
60+
}
61+
62+
func (e errCompactionAborted) Error() string {
63+
if e.err != nil {
64+
return fmt.Sprintf("compaction aborted: %s", e.err)
65+
}
66+
return "compaction aborted"
67+
}
68+
5969
// CompactionGroup represents a list of files eligible to be compacted together.
6070
type CompactionGroup []string
6171

@@ -586,6 +596,7 @@ type Compactor struct {
586596

587597
FileStore interface {
588598
NextGeneration() int
599+
TSMReader(path string) *TSMReader
589600
}
590601

591602
mu sync.RWMutex
@@ -737,20 +748,17 @@ func (c *Compactor) compact(fast bool, tsmFiles []string) ([]string, error) {
737748
for _, file := range tsmFiles {
738749
select {
739750
case <-intC:
740-
return nil, errCompactionAborted
751+
return nil, errCompactionAborted{}
741752
default:
742753
}
743754

744-
f, err := os.Open(file)
745-
if err != nil {
746-
return nil, err
747-
}
748-
749-
tr, err := NewTSMReader(f)
750-
if err != nil {
751-
return nil, err
755+
tr := c.FileStore.TSMReader(file)
756+
if tr == nil {
757+
// This would be a bug if this occurred as tsmFiles passed in should only be
758+
// assigned to one compaction at any one time. A nil tr would mean the file
759+
// doesn't exist.
760+
return nil, errCompactionAborted{fmt.Errorf("bad plan: %s", file)}
752761
}
753-
defer tr.Close()
754762
trs = append(trs, tr)
755763
}
756764

@@ -917,7 +925,7 @@ func (c *Compactor) write(path string, iter KeyIterator) (err error) {
917925
c.mu.RUnlock()
918926

919927
if !enabled {
920-
return errCompactionAborted
928+
return errCompactionAborted{}
921929
}
922930
// Each call to read returns the next sorted key (or the prior one if there are
923931
// more values to write). The size of values will be less than or equal to our
@@ -1260,7 +1268,7 @@ func (k *tsmKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
12601268
// See if compactions were disabled while we were running.
12611269
select {
12621270
case <-k.interrupt:
1263-
return nil, 0, 0, nil, errCompactionAborted
1271+
return nil, 0, 0, nil, errCompactionAborted{}
12641272
default:
12651273
}
12661274

@@ -1400,7 +1408,7 @@ func (c *cacheKeyIterator) Read() ([]byte, int64, int64, []byte, error) {
14001408
// See if snapshot compactions were disabled while we were running.
14011409
select {
14021410
case <-c.interrupt:
1403-
return nil, 0, 0, nil, errCompactionAborted
1411+
return nil, 0, 0, nil, errCompactionAborted{}
14041412
default:
14051413
}
14061414

tsdb/engine/tsm1/compact_test.go

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,11 @@ func TestCompactor_CompactFull(t *testing.T) {
116116
}
117117
f3 := MustWriteTSM(dir, 3, writes)
118118

119+
fs := &fakeFileStore{}
120+
defer fs.Close()
119121
compactor := &tsm1.Compactor{
120122
Dir: dir,
121-
FileStore: &fakeFileStore{},
123+
FileStore: fs,
122124
}
123125

124126
files, err := compactor.CompactFull([]string{f1, f2, f3})
@@ -215,9 +217,11 @@ func TestCompactor_Compact_OverlappingBlocks(t *testing.T) {
215217
}
216218
f3 := MustWriteTSM(dir, 3, writes)
217219

220+
fs := &fakeFileStore{}
221+
defer fs.Close()
218222
compactor := &tsm1.Compactor{
219223
Dir: dir,
220-
FileStore: &fakeFileStore{},
224+
FileStore: fs,
221225
Size: 2,
222226
}
223227

@@ -294,9 +298,11 @@ func TestCompactor_Compact_OverlappingBlocksMultiple(t *testing.T) {
294298
}
295299
f3 := MustWriteTSM(dir, 3, writes)
296300

301+
fs := &fakeFileStore{}
302+
defer fs.Close()
297303
compactor := &tsm1.Compactor{
298304
Dir: dir,
299-
FileStore: &fakeFileStore{},
305+
FileStore: fs,
300306
Size: 2,
301307
}
302308

@@ -365,9 +371,11 @@ func TestCompactor_CompactFull_SkipFullBlocks(t *testing.T) {
365371
}
366372
f3 := MustWriteTSM(dir, 3, writes)
367373

374+
fs := &fakeFileStore{}
375+
defer fs.Close()
368376
compactor := &tsm1.Compactor{
369377
Dir: dir,
370-
FileStore: &fakeFileStore{},
378+
FileStore: fs,
371379
Size: 2,
372380
}
373381
compactor.Open()
@@ -464,9 +472,11 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
464472
}
465473
f3 := MustWriteTSM(dir, 3, writes)
466474

475+
fs := &fakeFileStore{}
476+
defer fs.Close()
467477
compactor := &tsm1.Compactor{
468478
Dir: dir,
469-
FileStore: &fakeFileStore{},
479+
FileStore: fs,
470480
Size: 2,
471481
}
472482
compactor.Open()
@@ -564,9 +574,11 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
564574
}
565575
f3 := MustWriteTSM(dir, 3, writes)
566576

577+
fs := &fakeFileStore{}
578+
defer fs.Close()
567579
compactor := &tsm1.Compactor{
568580
Dir: dir,
569-
FileStore: &fakeFileStore{},
581+
FileStore: fs,
570582
Size: 2,
571583
}
572584
compactor.Open()
@@ -669,9 +681,11 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
669681
}
670682
f3 := MustWriteTSM(dir, 3, writes)
671683

684+
fs := &fakeFileStore{}
685+
defer fs.Close()
672686
compactor := &tsm1.Compactor{
673687
Dir: dir,
674-
FileStore: &fakeFileStore{},
688+
FileStore: fs,
675689
Size: 2,
676690
}
677691
compactor.Open()
@@ -782,9 +796,11 @@ func TestCompactor_CompactFull_MaxKeys(t *testing.T) {
782796
}
783797
f2.Close()
784798

799+
fs := &fakeFileStore{}
800+
defer fs.Close()
785801
compactor := &tsm1.Compactor{
786802
Dir: dir,
787-
FileStore: &fakeFileStore{},
803+
FileStore: fs,
788804
}
789805
compactor.Open()
790806

@@ -2434,6 +2450,7 @@ type fakeFileStore struct {
24342450
PathsFn func() []tsm1.FileStat
24352451
lastModified time.Time
24362452
blockCount int
2453+
readers []*tsm1.TSMReader
24372454
}
24382455

24392456
func (w *fakeFileStore) Stats() []tsm1.FileStat {
@@ -2451,3 +2468,16 @@ func (w *fakeFileStore) LastModified() time.Time {
24512468
func (w *fakeFileStore) BlockCount(path string, idx int) int {
24522469
return w.blockCount
24532470
}
2471+
2472+
func (w *fakeFileStore) TSMReader(path string) *tsm1.TSMReader {
2473+
r := MustOpenTSMReader(path)
2474+
w.readers = append(w.readers, r)
2475+
return r
2476+
}
2477+
2478+
func (w *fakeFileStore) Close() {
2479+
for _, r := range w.readers {
2480+
r.Close()
2481+
}
2482+
w.readers = nil
2483+
}

tsdb/engine/tsm1/file_store.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,19 @@ func (f *FileStore) Cost(key []byte, min, max int64) query.IteratorCost {
479479
return f.cost(key, min, max)
480480
}
481481

482+
// Reader returns a TSMReader for path if one is currently managed by the FileStore.
483+
// Otherwise it returns nil.
484+
func (f *FileStore) TSMReader(path string) *TSMReader {
485+
f.mu.RLock()
486+
defer f.mu.RUnlock()
487+
for _, r := range f.files {
488+
if r.Path() == path {
489+
return r.(*TSMReader)
490+
}
491+
}
492+
return nil
493+
}
494+
482495
// KeyCursor returns a KeyCursor for key and t across the files in the FileStore.
483496
func (f *FileStore) KeyCursor(key []byte, t int64, ascending bool) *KeyCursor {
484497
f.mu.RLock()

0 commit comments

Comments
 (0)