Skip to content

Commit b82ed77

Browse files
[IMPROVED] PurgeEx only loads within subject range
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 74cbb96 commit b82ed77

File tree

2 files changed

+93
-0
lines changed

2 files changed

+93
-0
lines changed

server/filestore.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9564,10 +9564,41 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
95649564
fs.mu.Unlock()
95659565
return purged, err
95669566
}
9567+
9568+
var start, stop uint32
9569+
9570+
// If literal subject check for presence.
9571+
if wc {
9572+
start = fs.lmb.index
9573+
fs.psim.Match(stringToBytes(subject), func(_ []byte, psi *psi) {
9574+
// Keep track of start and stop indexes for this subject.
9575+
if psi.fblk < start {
9576+
start = psi.fblk
9577+
}
9578+
if psi.lblk > stop {
9579+
stop = psi.lblk
9580+
}
9581+
})
9582+
// None matched.
9583+
if stop == 0 {
9584+
fs.mu.Unlock()
9585+
return purged, nil
9586+
}
9587+
} else if info, ok := fs.psim.Find(stringToBytes(subject)); ok {
9588+
start, stop = info.fblk, info.lblk
9589+
} else {
9590+
fs.mu.Unlock()
9591+
return purged, nil
9592+
}
9593+
95679594
// We may remove blocks as we purge, so don't range directly on fs.blks
95689595
// otherwise we may jump over some (see https://github.com/nats-io/nats-server/issues/3528)
95699596
for i := 0; i < len(fs.blks); i++ {
95709597
mb := fs.blks[i]
9598+
// Skip if not within our range for the purge subject.
9599+
if mb.index < start || mb.index > stop {
9600+
continue
9601+
}
95719602
mb.mu.Lock()
95729603

95739604
// If we do not have our fss, try to expire the cache if we have no items in this block.

server/filestore_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13574,3 +13574,65 @@ func TestFileStoreNoDirectoryNotEmptyError(t *testing.T) {
1357413574
wg.Wait()
1357513575
}
1357613576
}
13577+
13578+
func TestFileStoreDontLoadSubjectStateIfNotPurged(t *testing.T) {
13579+
fcfg := FileStoreConfig{StoreDir: t.TempDir()}
13580+
cfg := StreamConfig{Name: "zzz", Storage: FileStorage, Subjects: []string{">"}}
13581+
fs, err := newFileStore(fcfg, cfg)
13582+
require_NoError(t, err)
13583+
defer fs.Stop()
13584+
13585+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
13586+
require_NoError(t, err)
13587+
13588+
fmb := fs.getFirstBlock()
13589+
fmb.mu.Lock()
13590+
fmb.fss = nil
13591+
fmb.mu.Unlock()
13592+
13593+
// When purging a subject that doesn't exist, we shouldn't need to load subjects for any blocks.
13594+
purged, err := fs.PurgeEx("bar", 0, 0)
13595+
require_NoError(t, err)
13596+
require_Equal(t, purged, 0)
13597+
13598+
fmb.mu.RLock()
13599+
defer fmb.mu.RUnlock()
13600+
require_True(t, fmb.fss == nil)
13601+
}
13602+
13603+
func TestFileStoreOnlyLoadSubjectStateOnBlocksWithinInterestRange(t *testing.T) {
13604+
fcfg := FileStoreConfig{StoreDir: t.TempDir()}
13605+
cfg := StreamConfig{Name: "zzz", Storage: FileStorage, Subjects: []string{">"}}
13606+
fs, err := newFileStore(fcfg, cfg)
13607+
require_NoError(t, err)
13608+
defer fs.Stop()
13609+
13610+
_, _, err = fs.StoreMsg("foo", nil, nil, 0)
13611+
require_NoError(t, err)
13612+
_, err = fs.newMsgBlockForWrite()
13613+
require_NoError(t, err)
13614+
_, _, err = fs.StoreMsg("bar", nil, nil, 0)
13615+
require_NoError(t, err)
13616+
13617+
fs.mu.RLock()
13618+
for _, mb := range fs.blks {
13619+
mb.mu.Lock()
13620+
mb.fss = nil
13621+
mb.mu.Unlock()
13622+
}
13623+
fs.mu.RUnlock()
13624+
13625+
// When purging a subject that only exists on a subset, we should only load blocks within its range.
13626+
purged, err := fs.PurgeEx("bar", 0, 0)
13627+
require_NoError(t, err)
13628+
require_Equal(t, purged, 1)
13629+
13630+
fs.mu.RLock()
13631+
defer fs.mu.RUnlock()
13632+
for i, mb := range fs.blks {
13633+
mb.mu.Lock()
13634+
loadedFss := mb.fss != nil
13635+
mb.mu.Unlock()
13636+
require_Equal(t, loadedFss, i == 1)
13637+
}
13638+
}

0 commit comments

Comments
 (0)