Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ require (
github.com/dgraph-io/ristretto v0.1.1
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
github.com/docker/go-units v0.5.0
github.com/dolthub/swiss v0.2.1
github.com/emirpasic/gods v1.18.1
github.com/fatih/color v1.18.0
github.com/felixge/fgprof v0.9.3
Expand Down Expand Up @@ -247,7 +246,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/dolthub/maphash v0.1.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fatih/structtag v1.2.0
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,6 @@ github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa5
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw=
github.com/dolthub/swiss v0.2.1/go.mod h1:8AhKZZ1HK7g18j7v7k6c5cYIGEZJcPn0ARsai8cUrh0=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down
4 changes: 1 addition & 3 deletions pkg/statistics/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ type AnalyzeResult struct {

// DestroyAndPutToPool destroys the result and put it to the pool.
func (a *AnalyzeResult) DestroyAndPutToPool() {
for _, f := range a.Fms {
f.DestroyAndPutToPool()
}
a.Fms = nil
for _, h := range a.Hist {
h.DestroyAndPutToPool()
}
Expand Down
89 changes: 29 additions & 60 deletions pkg/statistics/fmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package statistics

import (
"hash"
"maps"
"sync"

"github.com/dolthub/swiss"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/types"
Expand All @@ -33,15 +33,6 @@ var murmur3Pool = sync.Pool{
},
}

var fmSketchPool = sync.Pool{
New: func() any {
return &FMSketch{
hashset: swiss.NewMap[uint64, bool](uint32(128)),
maxSize: 0,
}
},
}

// MaxSketchSize is the maximum size of the hashset in the FM sketch.
// TODO: add this attribute to PB and persist it instead of using a fixed number(executor.maxSketchSize)
const MaxSketchSize = 10000
Expand All @@ -64,7 +55,7 @@ const MaxSketchSize = 10000
// 2. https://algo.inria.fr/flajolet/Publications/FlMa85.pdf
type FMSketch struct {
// A set to store unique hashed values.
hashset *swiss.Map[uint64, bool]
hashset map[uint64]struct{}
// A binary mask used to track the maximum number of trailing zeroes in the hashed values.
// Also used to track the level of the sketch.
// Every time the size of the hashset exceeds the maximum size, the mask will be moved to the next level.
Expand All @@ -76,24 +67,22 @@ type FMSketch struct {

// NewFMSketch returns a new FM sketch.
func NewFMSketch(maxSize int) *FMSketch {
result := fmSketchPool.Get().(*FMSketch)
result.maxSize = maxSize
return result
return &FMSketch{
hashset: make(map[uint64]struct{}),
maxSize: maxSize,
}
}

// Copy makes a copy for current FMSketch.
func (s *FMSketch) Copy() *FMSketch {
if s == nil {
return nil
}
result := NewFMSketch(s.maxSize)
s.hashset.Iter(func(key uint64, value bool) bool {
result.hashset.Put(key, value)
return false
})
result.mask = s.mask
result.maxSize = s.maxSize
return result
return &FMSketch{
hashset: maps.Clone(s.hashset),
mask: s.mask,
maxSize: s.maxSize,
}
}

// NDV returns the estimated number of distinct values (NDV) in the sketch.
Expand All @@ -106,7 +95,7 @@ func (s *FMSketch) NDV() int64 {
// This is achieved by hashing the input value and counting the number of trailing zeroes in the binary representation of the hash value.
// So the count of distinct values with 'r' trailing zeroes is n / 2^r, where 'n' is the number of distinct values.
// Therefore, the estimated count of distinct values is 2^r * count = n.
return int64(s.mask+1) * int64(s.hashset.Count())
return int64(s.mask+1) * int64(len(s.hashset))
}

// insertHashValue inserts a hashed value into the sketch.
Expand All @@ -117,18 +106,15 @@ func (s *FMSketch) insertHashValue(hashVal uint64) {
return
}
// Put the hashed value into the hashset.
s.hashset.Put(hashVal, true)
s.hashset[hashVal] = struct{}{}
// We track the unique hashed values level by level to ensure a minimum count of distinct values at each level.
// This way, the final estimation is less likely to be skewed by outliers.
if s.hashset.Count() > s.maxSize {
if len(s.hashset) > s.maxSize {
// If the size of the hashset exceeds the maximum size, move the mask to the next level.
s.mask = s.mask*2 + 1
// Clean up the hashset by removing the hashed values with trailing zeroes less than the new mask.
s.hashset.Iter(func(k uint64, _ bool) (stop bool) {
if (k & s.mask) != 0 {
s.hashset.Delete(k)
}
return false
maps.DeleteFunc(s.hashset, func(k uint64, _ struct{}) bool {
return (k & s.mask) != 0
})
}
}
Expand Down Expand Up @@ -182,28 +168,23 @@ func (s *FMSketch) MergeFMSketch(rs *FMSketch) {
}
if s.mask < rs.mask {
s.mask = rs.mask
s.hashset.Iter(func(key uint64, _ bool) bool {
if (key & s.mask) != 0 {
s.hashset.Delete(key)
}
return false
maps.DeleteFunc(s.hashset, func(k uint64, _ struct{}) bool {
return (k & s.mask) != 0
})
}
rs.hashset.Iter(func(key uint64, _ bool) bool {
for key := range rs.hashset {
s.insertHashValue(key)
return false
})
}
}

// FMSketchToProto converts FMSketch to its protobuf representation.
func FMSketchToProto(s *FMSketch) *tipb.FMSketch {
protoSketch := new(tipb.FMSketch)
if s != nil {
protoSketch.Mask = s.mask
s.hashset.Iter(func(val uint64, _ bool) bool {
for val := range s.hashset {
protoSketch.Hashset = append(protoSketch.Hashset, val)
return false
})
}
}
return protoSketch
}
Expand All @@ -213,10 +194,12 @@ func FMSketchFromProto(protoSketch *tipb.FMSketch) *FMSketch {
if protoSketch == nil {
return nil
}
sketch := fmSketchPool.Get().(*FMSketch)
sketch.mask = protoSketch.Mask
sketch := &FMSketch{
hashset: make(map[uint64]struct{}, len(protoSketch.Hashset)),
mask: protoSketch.Mask,
}
for _, val := range protoSketch.Hashset {
sketch.hashset.Put(val, true)
sketch.hashset[val] = struct{}{}
}
return sketch
}
Expand Down Expand Up @@ -249,22 +232,8 @@ func DecodeFMSketch(data []byte) (*FMSketch, error) {
// MemoryUsage returns the total memory usage of a FMSketch.
func (s *FMSketch) MemoryUsage() (sum int64) {
// As for the variables mask(uint64) and maxSize(int) each will consume 8 bytes. This is the origin of the constant 16.
// And for the variables hashset(map[uint64]bool), each element in map will consume 9 bytes(8[uint64] + 1[bool]).
sum = int64(16 + 9*s.hashset.Count())
// And for the variables hashset(map[uint64]struct{}), each element in map will consume 8 bytes(uint64 key).
sum = int64(16 + 8*len(s.hashset))
return
}

func (s *FMSketch) reset() {
s.hashset.Clear()
s.mask = 0
s.maxSize = 0
}

// DestroyAndPutToPool resets the FMSketch and puts it to the pool.
func (s *FMSketch) DestroyAndPutToPool() {
if s == nil {
return
}
s.reset()
fmSketchPool.Put(s)
}
14 changes: 7 additions & 7 deletions pkg/statistics/fmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func SubTestSketch() func(*testing.T) {
sketch := NewFMSketch(maxSize)
sketch.insertHashValue(1)
sketch.insertHashValue(2)
require.Equal(t, maxSize, sketch.hashset.Count())
require.Equal(t, maxSize, len(sketch.hashset))
sketch.insertHashValue(4)
require.LessOrEqual(t, maxSize, sketch.hashset.Count())
require.LessOrEqual(t, maxSize, len(sketch.hashset))
}
}

Expand All @@ -87,11 +87,11 @@ func SubTestSketchProtoConversion() func(*testing.T) {
p := FMSketchToProto(sampleSketch)
f := FMSketchFromProto(p)
require.Equal(t, f.mask, sampleSketch.mask)
require.Equal(t, f.hashset.Count(), sampleSketch.hashset.Count())
sampleSketch.hashset.Iter(func(key uint64, _ bool) bool {
require.True(t, f.hashset.Has(key))
return false
})
require.Equal(t, len(f.hashset), len(sampleSketch.hashset))
for key := range sampleSketch.hashset {
_, ok := f.hashset[key]
require.True(t, ok)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ func blockingMergePartitionStats2GlobalStats(
globalStats.Fms[i] = allFms[i][j]
} else {
globalStats.Fms[i].MergeFMSketch(allFms[i][j])
allFms[i][j].DestroyAndPutToPool()
allFms[i][j] = nil
}
}

// Update the global NDV.
globalStatsNDV := min(globalStats.Fms[i].NDV(), globalStats.Count)
globalStats.Fms[i].DestroyAndPutToPool()
globalStats.Fms[i] = nil

// Merge CMSketch.
globalStats.Cms[i] = allCms[i][0]
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/globalstats/global_stats_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.Statem
// Update the global NDV.
globalStatsNDV := min(a.globalStats.Fms[i].NDV(), a.globalStats.Count)
a.globalStatsNDV = append(a.globalStatsNDV, globalStatsNDV)
a.globalStats.Fms[i].DestroyAndPutToPool()
a.globalStats.Fms[i] = nil
}
}
err = a.dealCMSketch()
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/storage/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func GenJSONTableFromStats(
return true
}
jsonTbl.Columns[col.Info.Name.L] = proto
col.FMSketch.DestroyAndPutToPool()
col.FMSketch = nil
hist.DestroyAndPutToPool()
return false
})
Expand Down
4 changes: 1 addition & 3 deletions pkg/statistics/row_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,7 @@ func (s *RowSampleBuilder) Collect() (RowSampleCollector, error) {
}

func (s *baseCollector) destroyAndPutToPool() {
for _, sketch := range s.FMSketches {
sketch.DestroyAndPutToPool()
}
s.FMSketches = nil
}

func (s *baseCollector) collectColumns(sc *stmtctx.StatementContext, cols []types.Datum, sizes []int64) error {
Expand Down
6 changes: 0 additions & 6 deletions pkg/statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,13 +960,7 @@ func (t *Table) IsOutdated() bool {

// ReleaseAndPutToPool releases data structures of Table and put itself back to pool.
func (t *Table) ReleaseAndPutToPool() {
for _, col := range t.columns {
col.FMSketch.DestroyAndPutToPool()
}
clear(t.columns)
for _, idx := range t.indices {
idx.FMSketch.DestroyAndPutToPool()
}
clear(t.indices)
}

Expand Down