Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -2331,32 +2331,6 @@ def go_deps():
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/docopt/docopt-go/com_github_docopt_docopt_go-v0.0.0-20180111231733-ee0de3bc6815.zip",
],
)
go_repository(
name = "com_github_dolthub_maphash",
build_file_proto_mode = "disable_global",
importpath = "github.com/dolthub/maphash",
sha256 = "ba69ef526a9613cb059c8490c1a4f032649879c316a1c4305e2355815eb32e41",
strip_prefix = "github.com/dolthub/maphash@v0.1.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/dolthub/maphash/com_github_dolthub_maphash-v0.1.0.zip",
"http://ats.apps.svc/gomod/github.com/dolthub/maphash/com_github_dolthub_maphash-v0.1.0.zip",
"https://cache.hawkingrei.com/gomod/github.com/dolthub/maphash/com_github_dolthub_maphash-v0.1.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/dolthub/maphash/com_github_dolthub_maphash-v0.1.0.zip",
],
)
go_repository(
name = "com_github_dolthub_swiss",
build_file_proto_mode = "disable_global",
importpath = "github.com/dolthub/swiss",
sha256 = "e911b7cea9aaed1255544fb8b53c19780f91b713e6d0fc71fb310232e4800dcc",
strip_prefix = "github.com/dolthub/swiss@v0.2.1",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/dolthub/swiss/com_github_dolthub_swiss-v0.2.1.zip",
"http://ats.apps.svc/gomod/github.com/dolthub/swiss/com_github_dolthub_swiss-v0.2.1.zip",
"https://cache.hawkingrei.com/gomod/github.com/dolthub/swiss/com_github_dolthub_swiss-v0.2.1.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/dolthub/swiss/com_github_dolthub_swiss-v0.2.1.zip",
],
)
go_repository(
name = "com_github_dustin_go_humanize",
build_file_proto_mode = "disable_global",
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ require (
github.com/dgraph-io/ristretto v0.1.1
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da
github.com/docker/go-units v0.5.0
github.com/dolthub/swiss v0.2.1
github.com/emirpasic/gods v1.18.1
github.com/fatih/color v1.18.0
github.com/felixge/fgprof v0.9.3
Expand Down Expand Up @@ -247,7 +246,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/dolthub/maphash v0.1.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/fatih/structtag v1.2.0
github.com/felixge/httpsnoop v1.0.4 // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -305,10 +305,6 @@ github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa5
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw=
github.com/dolthub/swiss v0.2.1/go.mod h1:8AhKZZ1HK7g18j7v7k6c5cYIGEZJcPn0ARsai8cUrh0=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down
1 change: 0 additions & 1 deletion pkg/statistics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ go_library(
"//pkg/util/memory",
"//pkg/util/ranger",
"//pkg/util/sqlexec",
"@com_github_dolthub_swiss//:swiss",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_tipb//go-tipb",
Expand Down
4 changes: 1 addition & 3 deletions pkg/statistics/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ type AnalyzeResult struct {

// DestroyAndPutToPool destroys the result and put it to the pool.
func (a *AnalyzeResult) DestroyAndPutToPool() {
for _, f := range a.Fms {
f.DestroyAndPutToPool()
}
a.Fms = nil // Release for GC.
for _, h := range a.Hist {
h.DestroyAndPutToPool()
}
Expand Down
96 changes: 35 additions & 61 deletions pkg/statistics/fmsketch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ package statistics

import (
"hash"
"maps"
"sync"

"github.com/dolthub/swiss"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/types"
Expand All @@ -33,15 +33,6 @@ var murmur3Pool = sync.Pool{
},
}

var fmSketchPool = sync.Pool{
New: func() any {
return &FMSketch{
hashset: swiss.NewMap[uint64, bool](uint32(128)),
maxSize: 0,
}
},
}

// MaxSketchSize is the maximum size of the hashset in the FM sketch.
// TODO: add this attribute to PB and persist it instead of using a fixed number(executor.maxSketchSize)
const MaxSketchSize = 10000
Expand All @@ -64,7 +55,7 @@ const MaxSketchSize = 10000
// 2. https://algo.inria.fr/flajolet/Publications/FlMa85.pdf
type FMSketch struct {
// A set to store unique hashed values.
hashset *swiss.Map[uint64, bool]
hashset map[uint64]struct{}
// A binary mask used to track the maximum number of trailing zeroes in the hashed values.
// Also used to track the level of the sketch.
// Every time the size of the hashset exceeds the maximum size, the mask will be moved to the next level.
Expand All @@ -76,24 +67,28 @@ type FMSketch struct {

// NewFMSketch returns a new FM sketch.
func NewFMSketch(maxSize int) *FMSketch {
result := fmSketchPool.Get().(*FMSketch)
result.maxSize = maxSize
return result
initialSize := maxSize
if initialSize > 128 {
initialSize = 128
} else if initialSize < 0 {
initialSize = 0
}
return &FMSketch{
hashset: make(map[uint64]struct{}, initialSize),
maxSize: maxSize,
}
}

// Copy makes a copy for current FMSketch.
func (s *FMSketch) Copy() *FMSketch {
if s == nil {
return nil
}
result := NewFMSketch(s.maxSize)
s.hashset.Iter(func(key uint64, value bool) bool {
result.hashset.Put(key, value)
return false
})
result.mask = s.mask
result.maxSize = s.maxSize
return result
return &FMSketch{
hashset: maps.Clone(s.hashset),
mask: s.mask,
maxSize: s.maxSize,
}
}

// NDV returns the estimated number of distinct values (NDV) in the sketch.
Expand All @@ -106,7 +101,7 @@ func (s *FMSketch) NDV() int64 {
// This is achieved by hashing the input value and counting the number of trailing zeroes in the binary representation of the hash value.
// So the count of distinct values with 'r' trailing zeroes is n / 2^r, where 'n' is the number of distinct values.
// Therefore, the estimated count of distinct values is 2^r * count = n.
return int64(s.mask+1) * int64(s.hashset.Count())
return int64(s.mask+1) * int64(len(s.hashset))
}

// insertHashValue inserts a hashed value into the sketch.
Expand All @@ -117,18 +112,15 @@ func (s *FMSketch) insertHashValue(hashVal uint64) {
return
}
// Put the hashed value into the hashset.
s.hashset.Put(hashVal, true)
s.hashset[hashVal] = struct{}{}
// We track the unique hashed values level by level to ensure a minimum count of distinct values at each level.
// This way, the final estimation is less likely to be skewed by outliers.
if s.hashset.Count() > s.maxSize {
if len(s.hashset) > s.maxSize {
// If the size of the hashset exceeds the maximum size, move the mask to the next level.
s.mask = s.mask*2 + 1
// Clean up the hashset by removing the hashed values with trailing zeroes less than the new mask.
s.hashset.Iter(func(k uint64, _ bool) (stop bool) {
if (k & s.mask) != 0 {
s.hashset.Delete(k)
}
return false
maps.DeleteFunc(s.hashset, func(k uint64, _ struct{}) bool {
return (k & s.mask) != 0
})
}
}
Expand Down Expand Up @@ -182,28 +174,23 @@ func (s *FMSketch) MergeFMSketch(rs *FMSketch) {
}
if s.mask < rs.mask {
s.mask = rs.mask
s.hashset.Iter(func(key uint64, _ bool) bool {
if (key & s.mask) != 0 {
s.hashset.Delete(key)
}
return false
maps.DeleteFunc(s.hashset, func(k uint64, _ struct{}) bool {
return (k & s.mask) != 0
})
}
rs.hashset.Iter(func(key uint64, _ bool) bool {
for key := range rs.hashset {
s.insertHashValue(key)
return false
})
}
}

// FMSketchToProto converts FMSketch to its protobuf representation.
func FMSketchToProto(s *FMSketch) *tipb.FMSketch {
protoSketch := new(tipb.FMSketch)
if s != nil {
protoSketch.Mask = s.mask
s.hashset.Iter(func(val uint64, _ bool) bool {
for val := range s.hashset {
protoSketch.Hashset = append(protoSketch.Hashset, val)
return false
})
}
}
return protoSketch
}
Expand All @@ -213,10 +200,12 @@ func FMSketchFromProto(protoSketch *tipb.FMSketch) *FMSketch {
if protoSketch == nil {
return nil
}
sketch := fmSketchPool.Get().(*FMSketch)
sketch.mask = protoSketch.Mask
sketch := &FMSketch{
hashset: make(map[uint64]struct{}, len(protoSketch.Hashset)),
mask: protoSketch.Mask,
}
for _, val := range protoSketch.Hashset {
sketch.hashset.Put(val, true)
sketch.hashset[val] = struct{}{}
}
return sketch
}
Expand Down Expand Up @@ -249,22 +238,7 @@ func DecodeFMSketch(data []byte) (*FMSketch, error) {
// MemoryUsage returns the total memory usage of a FMSketch.
func (s *FMSketch) MemoryUsage() (sum int64) {
// As for the variables mask(uint64) and maxSize(int) each will consume 8 bytes. This is the origin of the constant 16.
// And for the variables hashset(map[uint64]bool), each element in map will consume 9 bytes(8[uint64] + 1[bool]).
sum = int64(16 + 9*s.hashset.Count())
// And for the variables hashset(map[uint64]struct{}), we estimate 8 bytes per entry (key size only, excluding Go map overhead).
sum = int64(16 + 8*len(s.hashset))
return
}

func (s *FMSketch) reset() {
s.hashset.Clear()
s.mask = 0
s.maxSize = 0
}

// DestroyAndPutToPool resets the FMSketch and puts it to the pool.
func (s *FMSketch) DestroyAndPutToPool() {
if s == nil {
return
}
s.reset()
fmSketchPool.Put(s)
}
14 changes: 7 additions & 7 deletions pkg/statistics/fmsketch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func SubTestSketch() func(*testing.T) {
sketch := NewFMSketch(maxSize)
sketch.insertHashValue(1)
sketch.insertHashValue(2)
require.Equal(t, maxSize, sketch.hashset.Count())
require.Equal(t, maxSize, len(sketch.hashset))
sketch.insertHashValue(4)
require.LessOrEqual(t, maxSize, sketch.hashset.Count())
require.LessOrEqual(t, maxSize, len(sketch.hashset))
}
}

Expand All @@ -87,11 +87,11 @@ func SubTestSketchProtoConversion() func(*testing.T) {
p := FMSketchToProto(sampleSketch)
f := FMSketchFromProto(p)
require.Equal(t, f.mask, sampleSketch.mask)
require.Equal(t, f.hashset.Count(), sampleSketch.hashset.Count())
sampleSketch.hashset.Iter(func(key uint64, _ bool) bool {
require.True(t, f.hashset.Has(key))
return false
})
require.Equal(t, len(f.hashset), len(sampleSketch.hashset))
for key := range sampleSketch.hashset {
_, ok := f.hashset[key]
require.True(t, ok)
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/statistics/handle/globalstats/global_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,13 @@ func blockingMergePartitionStats2GlobalStats(
globalStats.Fms[i] = allFms[i][j]
} else {
globalStats.Fms[i].MergeFMSketch(allFms[i][j])
allFms[i][j].DestroyAndPutToPool()
allFms[i][j] = nil // Release for GC.
}
}

// Update the global NDV.
globalStatsNDV := min(globalStats.Fms[i].NDV(), globalStats.Count)
globalStats.Fms[i].DestroyAndPutToPool()
globalStats.Fms[i] = nil // Release for GC.

// Merge CMSketch.
globalStats.Cms[i] = allCms[i][0]
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/globalstats/global_stats_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (a *AsyncMergePartitionStats2GlobalStats) cpuWorker(stmtCtx *stmtctx.Statem
// Update the global NDV.
globalStatsNDV := min(a.globalStats.Fms[i].NDV(), a.globalStats.Count)
a.globalStatsNDV = append(a.globalStatsNDV, globalStatsNDV)
a.globalStats.Fms[i].DestroyAndPutToPool()
a.globalStats.Fms[i] = nil // Release for GC.
}
}
err = a.dealCMSketch()
Expand Down
2 changes: 1 addition & 1 deletion pkg/statistics/handle/storage/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func GenJSONTableFromStats(
return true
}
jsonTbl.Columns[col.Info.Name.L] = proto
col.FMSketch.DestroyAndPutToPool()
col.FMSketch = nil // Release for GC.
hist.DestroyAndPutToPool()
return false
})
Expand Down
4 changes: 1 addition & 3 deletions pkg/statistics/row_sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,7 @@ func (s *RowSampleBuilder) Collect() (RowSampleCollector, error) {
}

func (s *baseCollector) destroyAndPutToPool() {
for _, sketch := range s.FMSketches {
sketch.DestroyAndPutToPool()
}
s.FMSketches = nil // Release for GC.
}

func (s *baseCollector) collectColumns(sc *stmtctx.StatementContext, cols []types.Datum, sizes []int64) error {
Expand Down
12 changes: 0 additions & 12 deletions pkg/statistics/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,18 +958,6 @@ func (t *Table) IsOutdated() bool {
return false
}

// ReleaseAndPutToPool releases data structures of Table and put itself back to pool.
func (t *Table) ReleaseAndPutToPool() {
for _, col := range t.columns {
col.FMSketch.DestroyAndPutToPool()
}
clear(t.columns)
for _, idx := range t.indices {
idx.FMSketch.DestroyAndPutToPool()
}
clear(t.indices)
}

// ID2UniqueID generates a new HistColl whose `Columns` is built from UniqueID of given columns.
func (coll *HistColl) ID2UniqueID(columns []*expression.Column) *HistColl {
cols := make(map[int64]*Column)
Expand Down