Skip to content

Commit d3e832b

Browse files
committed
Use offheap memory for indirect index offsets slice
1 parent 91eb9de commit d3e832b

5 files changed

Lines changed: 107 additions & 25 deletions

File tree

pkg/bytesutil/bytesutil.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bytesutil
22

33
import (
44
"bytes"
5+
"fmt"
56
"sort"
67
)
78

@@ -18,6 +19,28 @@ func SearchBytes(a [][]byte, x []byte) int {
1819
return sort.Search(len(a), func(i int) bool { return bytes.Compare(a[i], x) >= 0 })
1920
}
2021

22+
// SearchBytesFixed searches a for x using a binary search. The size of a must be a multiple of
23+
// of x or else the function panics. There returned value is the index within a where x should
24+
// exist. The caller should ensure that x does exist at this index.
25+
func SearchBytesFixed(a []byte, sz int, fn func(x []byte) bool) int {
26+
if len(a)%sz != 0 {
27+
panic(fmt.Sprintf("x is not a multiple of a: %d %d", len(a), sz))
28+
}
29+
30+
i, j := 0, len(a)-sz
31+
for i < j {
32+
h := int(uint(i+j) >> 1)
33+
h -= h % sz
34+
if !fn(a[h : h+sz]) {
35+
i = h + sz
36+
} else {
37+
j = h
38+
}
39+
}
40+
41+
return i
42+
}
43+
2144
// Union returns the union of a & b in sorted order.
2245
func Union(a, b [][]byte) [][]byte {
2346
n := len(b)

pkg/bytesutil/bytesutil_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package bytesutil_test
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"testing"
7+
8+
"github.com/influxdata/influxdb/pkg/bytesutil"
9+
)
10+
11+
func TestSearchBytesFixed(t *testing.T) {
12+
n, sz := 5, 8
13+
a := make([]byte, n*sz) // 5 - 8 byte int64s
14+
15+
for i := 0; i < 5; i++ {
16+
binary.BigEndian.PutUint64(a[i*sz:i*sz+sz], uint64(i))
17+
}
18+
19+
var x [8]byte
20+
21+
for i := 0; i < n; i++ {
22+
binary.BigEndian.PutUint64(x[:], uint64(i))
23+
if exp, got := i*sz, bytesutil.SearchBytesFixed(a, len(x), func(v []byte) bool {
24+
return bytes.Compare(v, x[:]) >= 0
25+
}); exp != got {
26+
t.Fatalf("index mismatch: exp %v, got %v", exp, got)
27+
}
28+
}
29+
30+
if exp, got := len(a)-1, bytesutil.SearchBytesFixed(a, 1, func(v []byte) bool {
31+
return bytes.Compare(v, []byte{99}) >= 0
32+
}); exp != got {
33+
t.Fatalf("index mismatch: exp %v, got %v", exp, got)
34+
}
35+
}

tsdb/engine/tsm1/compact_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ func TestCompactor_CompactFull(t *testing.T) {
162162
t.Fatalf("wrong sequence for new file: got %v, exp %v", gotSeq, expSeq)
163163
}
164164

165+
println("Open", files[0])
165166
r := MustOpenTSMReader(files[0])
166167

167168
if got, exp := r.KeyCount(), 3; got != exp {

tsdb/engine/tsm1/mmap_unix.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ import (
88
)
99

1010
func mmap(f *os.File, offset int64, length int) ([]byte, error) {
11-
mmap, err := syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
12-
if err != nil {
13-
return nil, err
11+
// anonymous mapping
12+
if f == nil {
13+
return syscall.Mmap(-1, 0, length, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
1414
}
1515

16-
return mmap, nil
16+
return syscall.Mmap(int(f.Fd()), 0, length, syscall.PROT_READ, syscall.MAP_SHARED)
1717
}
1818

1919
func munmap(b []byte) (err error) {

tsdb/engine/tsm1/reader.go

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"io"
88
"math"
99
"os"
10-
"sort"
1110
"sync"
1211
"sync/atomic"
1312

@@ -102,6 +101,9 @@ type TSMIndex interface {
102101
// UnmarshalBinary populates an index from an encoded byte slice
103102
// representation of an index.
104103
UnmarshalBinary(b []byte) error
104+
105+
// Close closes the index and releases any resources.
106+
Close() error
105107
}
106108

107109
// BlockIterator allows iterating over each block in a TSM file in order. It provides
@@ -353,7 +355,7 @@ func (t *TSMReader) Close() error {
353355
return err
354356
}
355357

356-
return nil
358+
return t.index.Close()
357359
}
358360

359361
// Ref records a usage of this TSMReader. If there are active references
@@ -598,7 +600,7 @@ type indirectIndex struct {
598600

599601
// offsets contains the positions in b for each key. It points to the 2 byte length of
600602
// key.
601-
offsets []int32
603+
offsets []byte
602604

603605
// minKey, maxKey are the minium and maximum (lexicographically sorted) contained in the
604606
// file
@@ -631,9 +633,10 @@ func NewIndirectIndex() *indirectIndex {
631633
func (d *indirectIndex) search(key []byte) int {
632634
// We use a binary search across our indirect offsets (pointers to all the keys
633635
// in the index slice).
634-
i := sort.Search(len(d.offsets), func(i int) bool {
636+
i := bytesutil.SearchBytesFixed(d.offsets, 4, func(x []byte) bool {
635637
// i is the position in offsets we are at so get offset it points to
636-
offset := d.offsets[i]
638+
//offset := d.offsets[i]
639+
offset := int32(binary.BigEndian.Uint32(x))
637640

638641
// It's pointing to the start of the key which is a 2 byte length
639642
keyLen := int32(binary.BigEndian.Uint16(d.b[offset : offset+2]))
@@ -644,7 +647,7 @@ func (d *indirectIndex) search(key []byte) int {
644647

645648
// See if we might have found the right index
646649
if i < len(d.offsets) {
647-
ofs := d.offsets[i]
650+
ofs := binary.BigEndian.Uint32(d.offsets[i : i+4])
648651
_, k, err := readKey(d.b[ofs:])
649652
if err != nil {
650653
panic(fmt.Sprintf("error reading key: %v", err))
@@ -719,18 +722,19 @@ func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) {
719722
d.mu.RLock()
720723
defer d.mu.RUnlock()
721724

722-
if idx < 0 || idx >= len(d.offsets) {
725+
if idx < 0 || idx*4+4 > len(d.offsets) {
723726
return nil, 0, nil
724727
}
725-
n, key, err := readKey(d.b[d.offsets[idx]:])
728+
ofs := binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4])
729+
n, key, err := readKey(d.b[ofs:])
726730
if err != nil {
727731
return nil, 0, nil
728732
}
729733

730-
typ := d.b[int(d.offsets[idx])+n]
734+
typ := d.b[int(ofs)+n]
731735

732736
var entries indexEntries
733-
if _, err := readEntries(d.b[int(d.offsets[idx])+n:], &entries); err != nil {
737+
if _, err := readEntries(d.b[int(ofs)+n:], &entries); err != nil {
734738
return nil, 0, nil
735739
}
736740
return key, typ, entries.entries
@@ -740,20 +744,23 @@ func (d *indirectIndex) Key(idx int) ([]byte, byte, []IndexEntry) {
740744
func (d *indirectIndex) KeyAt(idx int) ([]byte, byte) {
741745
d.mu.RLock()
742746

743-
if idx < 0 || idx >= len(d.offsets) {
747+
if idx < 0 || idx*4+4 > len(d.offsets) {
744748
d.mu.RUnlock()
745749
return nil, 0
746750
}
747-
n, key, _ := readKey(d.b[d.offsets[idx]:])
748-
typ := d.b[d.offsets[idx]+int32(n)]
751+
ofs := int32(binary.BigEndian.Uint32(d.offsets[idx*4 : idx*4+4]))
752+
753+
n, key, _ := readKey(d.b[ofs:])
754+
ofs = ofs + int32(n)
755+
typ := d.b[ofs]
749756
d.mu.RUnlock()
750757
return key, typ
751758
}
752759

753760
// KeyCount returns the count of unique keys in the index.
754761
func (d *indirectIndex) KeyCount() int {
755762
d.mu.RLock()
756-
n := len(d.offsets)
763+
n := len(d.offsets) / 4
757764
d.mu.RUnlock()
758765
return n
759766
}
@@ -773,8 +780,9 @@ func (d *indirectIndex) Delete(keys [][]byte) {
773780

774781
// Both keys and offsets are sorted. Walk both in order and skip
775782
// any keys that exist in both.
776-
offsets := make([]int32, 0, len(d.offsets))
777-
for _, offset := range d.offsets {
783+
var j int
784+
for i := 0; i+4 <= len(d.offsets); i += 4 {
785+
offset := binary.BigEndian.Uint32(d.offsets[i : i+4])
778786
_, indexKey, _ := readKey(d.b[offset:])
779787

780788
for len(keys) > 0 && bytes.Compare(keys[0], indexKey) < 0 {
@@ -786,9 +794,11 @@ func (d *indirectIndex) Delete(keys [][]byte) {
786794
continue
787795
}
788796

789-
offsets = append(offsets, int32(offset))
797+
copy(d.offsets[j:j+4], d.offsets[i:i+4])
798+
j += 4
799+
//offsets = append(offsets, int32(offset))
790800
}
791-
d.offsets = offsets
801+
d.offsets = d.offsets[:j]
792802
}
793803

794804
// DeleteRange removes the given keys with data between minTime and maxTime from the index.
@@ -945,9 +955,10 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
945955
// basically skips across the slice keeping track of the counter when we are at a key
946956
// field.
947957
var i int32
958+
var offsets []int32
948959
iMax := int32(len(b))
949960
for i < iMax {
950-
d.offsets = append(d.offsets, i)
961+
offsets = append(offsets, i)
951962

952963
// Skip to the start of the values
953964
// key length value (2) + type (1) + length of key
@@ -986,14 +997,14 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
986997
i += indexEntrySize
987998
}
988999

989-
firstOfs := d.offsets[0]
1000+
firstOfs := offsets[0]
9901001
_, key, err := readKey(b[firstOfs:])
9911002
if err != nil {
9921003
return err
9931004
}
9941005
d.minKey = key
9951006

996-
lastOfs := d.offsets[len(d.offsets)-1]
1007+
lastOfs := offsets[len(offsets)-1]
9971008
_, key, err = readKey(b[lastOfs:])
9981009
if err != nil {
9991010
return err
@@ -1003,6 +1014,14 @@ func (d *indirectIndex) UnmarshalBinary(b []byte) error {
10031014
d.minTime = minTime
10041015
d.maxTime = maxTime
10051016

1017+
d.offsets, err = mmap(nil, 0, len(offsets)*4)
1018+
if err != nil {
1019+
return err
1020+
}
1021+
for i, v := range offsets {
1022+
binary.BigEndian.PutUint32(d.offsets[i*4:i*4+4], uint32(v))
1023+
}
1024+
10061025
return nil
10071026
}
10081027

@@ -1014,6 +1033,10 @@ func (d *indirectIndex) Size() uint32 {
10141033
return uint32(len(d.b))
10151034
}
10161035

1036+
func (d *indirectIndex) Close() error {
1037+
return munmap(d.offsets)
1038+
}
1039+
10171040
// mmapAccess is mmap based block accessor. It access blocks through an
10181041
// MMAP file interface.
10191042
type mmapAccessor struct {

0 commit comments

Comments
 (0)