Skip to content

Commit 807d653

Browse files
committed
Complete filtered LoadPrevMsg implementation
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent c971de0 commit 807d653

5 files changed

Lines changed: 209 additions & 49 deletions

File tree

server/filestore.go

Lines changed: 156 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -8515,58 +8515,182 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
85158515
return nil, fs.state.LastSeq, ErrStoreEOF
85168516
}
85178517

8518-
// Will load the next non-deleted msg starting at the start sequence and walking backwards.
8519-
func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) {
8518+
// Find the previous matching message.
8519+
// fs lock should be held.
8520+
func (mb *msgBlock) prevMatching(filter string, wc bool, start uint64, sm *StoreMsg) (*StoreMsg, bool, error) {
8521+
mb.mu.Lock()
8522+
var updateLLTS bool
8523+
defer func() {
8524+
if updateLLTS {
8525+
mb.llts = ats.AccessTime()
8526+
}
8527+
mb.finishedWithCache()
8528+
mb.mu.Unlock()
8529+
}()
8530+
8531+
end, isAll := start, filter == _EMPTY_ || filter == fwcs
8532+
8533+
var didLoad bool
8534+
if mb.fssNotLoaded() {
8535+
if err := mb.loadMsgsWithLock(); err != nil {
8536+
return nil, false, err
8537+
}
8538+
didLoad = true
8539+
}
8540+
mb.lsts = ats.AccessTime()
8541+
8542+
if filter == _EMPTY_ {
8543+
filter = fwcs
8544+
wc = true
8545+
}
8546+
8547+
if !isAll && mb.fss.Size() == 1 {
8548+
if !wc {
8549+
_, isAll = mb.fss.Find(stringToBytes(filter))
8550+
} else {
8551+
mb.fss.Match(stringToBytes(filter), func(subject []byte, _ *SimpleState) {
8552+
isAll = true
8553+
})
8554+
}
8555+
if !isAll {
8556+
return nil, didLoad, ErrStoreMsgNotFound
8557+
}
8558+
}
8559+
8560+
lseq := atomic.LoadUint64(&mb.first.seq)
8561+
end = min(end, atomic.LoadUint64(&mb.last.seq))
8562+
8563+
var isMatch func(subj string) bool
8564+
if wc {
8565+
_tsa, _fsa := [32]string{}, [32]string{}
8566+
tsa, fsa := _tsa[:0], tokenizeSubjectIntoSlice(_fsa[:0], filter)
8567+
isMatch = func(subj string) bool {
8568+
tsa = tokenizeSubjectIntoSlice(tsa[:0], subj)
8569+
return isSubsetMatchTokenized(tsa, fsa)
8570+
}
8571+
}
8572+
8573+
subjs := mb.fs.cfg.Subjects
8574+
doLinearScan := isAll || (wc && len(subjs) == 1 && subjs[0] == filter)
8575+
if !doLinearScan && wc && mb.cacheAlreadyLoaded() {
8576+
doLinearScan = mb.fss.Size()*4 > int(end-lseq)
8577+
}
8578+
8579+
if !doLinearScan {
8580+
var found bool
8581+
var first, last uint64
8582+
if bfilter := stringToBytes(filter); wc {
8583+
var ierr error
8584+
mb.fss.Match(bfilter, func(bsubj []byte, ss *SimpleState) {
8585+
if ierr != nil {
8586+
return
8587+
}
8588+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
8589+
mb.recalculateForSubj(bytesToString(bsubj), ss)
8590+
}
8591+
if end < ss.First {
8592+
return
8593+
}
8594+
if !found {
8595+
found = true
8596+
first, last = ss.First, min(end, ss.Last)
8597+
return
8598+
}
8599+
first = min(first, ss.First)
8600+
last = max(last, min(end, ss.Last))
8601+
})
8602+
if ierr != nil {
8603+
return nil, false, ierr
8604+
}
8605+
} else if ss, _ := mb.fss.Find(bfilter); ss != nil {
8606+
if ss.firstNeedsUpdate || ss.lastNeedsUpdate {
8607+
mb.recalculateForSubj(filter, ss)
8608+
}
8609+
if end >= ss.First {
8610+
found = true
8611+
first, last = ss.First, min(end, ss.Last)
8612+
}
8613+
}
8614+
if !found || first > last {
8615+
return nil, didLoad, ErrStoreMsgNotFound
8616+
}
8617+
lseq, end = max(lseq, first), last
8618+
}
8619+
8620+
if mb.cacheNotLoaded() {
8621+
if err := mb.loadMsgsWithLock(); err != nil {
8622+
return nil, false, err
8623+
}
8624+
didLoad = true
8625+
}
8626+
8627+
if sm == nil {
8628+
sm = new(StoreMsg)
8629+
}
8630+
8631+
for seq := end; seq >= lseq; seq-- {
8632+
if mb.dmap.Exists(seq) {
8633+
updateLLTS = true
8634+
continue
8635+
}
8636+
llseq := mb.llseq
8637+
fsm, err := mb.cacheLookup(seq, sm)
8638+
if err != nil {
8639+
if err == errPartialCache || err == errNoCache {
8640+
return nil, false, err
8641+
}
8642+
continue
8643+
}
8644+
updateLLTS = false
8645+
expireOk := seq == lseq && mb.llseq != llseq && mb.llseq == seq
8646+
if isAll {
8647+
return fsm, expireOk, nil
8648+
}
8649+
if wc && isMatch(sm.subj) {
8650+
return fsm, expireOk, nil
8651+
} else if !wc && fsm.subj == filter {
8652+
return fsm, expireOk, nil
8653+
}
8654+
mb.llseq = llseq
8655+
}
8656+
8657+
return nil, didLoad, ErrStoreMsgNotFound
8658+
}
8659+
8660+
// Will load the previous message matching the filter subject, starting at the start sequence and walking backwards.
8661+
func (fs *fileStore) LoadPrevMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
85208662
if fs.isClosed() {
8521-
return nil, ErrStoreClosed
8663+
return nil, 0, ErrStoreClosed
85228664
}
85238665

85248666
fs.mu.RLock()
85258667
defer fs.mu.RUnlock()
85268668

85278669
if fs.state.Msgs == 0 || start < fs.state.FirstSeq {
8528-
return nil, ErrStoreEOF
8670+
return nil, fs.state.FirstSeq, ErrStoreEOF
85298671
}
85308672

85318673
if start > fs.state.LastSeq {
85328674
start = fs.state.LastSeq
85338675
}
8534-
if smp == nil {
8535-
smp = new(StoreMsg)
8536-
}
85378676

85388677
if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
85398678
for i := bi; i >= 0; i-- {
85408679
mb := fs.blks[i]
8541-
mb.mu.Lock()
8542-
// Need messages loaded from here on out.
8543-
if mb.cacheNotLoaded() {
8544-
if err := mb.loadMsgsWithLock(); err != nil {
8545-
mb.mu.Unlock()
8546-
return nil, err
8547-
}
8548-
}
8549-
8550-
lseq, fseq := atomic.LoadUint64(&mb.last.seq), atomic.LoadUint64(&mb.first.seq)
8551-
if start > lseq {
8552-
start = lseq
8553-
}
8554-
for seq := start; seq >= fseq; seq-- {
8555-
if mb.dmap.Exists(seq) {
8556-
continue
8557-
}
8558-
if sm, err := mb.cacheLookup(seq, smp); err == nil {
8559-
mb.finishedWithCache()
8560-
mb.mu.Unlock()
8561-
return sm, nil
8680+
if sm, expireOk, err := mb.prevMatching(filter, wc, start, smp); err == nil {
8681+
if expireOk {
8682+
mb.tryForceExpireCache()
85628683
}
8684+
return sm, sm.seq, nil
8685+
} else if err != ErrStoreMsgNotFound {
8686+
return nil, 0, err
8687+
} else if expireOk {
8688+
mb.tryForceExpireCache()
85638689
}
8564-
mb.finishedWithCache()
8565-
mb.mu.Unlock()
85668690
}
85678691
}
85688692

8569-
return nil, ErrStoreEOF
8693+
return nil, fs.state.FirstSeq, ErrStoreEOF
85708694
}
85718695

85728696
// LoadPrevMsgMulti will find the previous message matching any entry in the sublist.
@@ -8576,15 +8700,7 @@ func (fs *fileStore) LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *
85768700
}
85778701

85788702
if sl == nil || sl.MatchesFullWildcard() {
8579-
if sm, err = fs.LoadPrevMsg(start, smp); err == nil {
8580-
return sm, sm.seq, nil
8581-
}
8582-
if err == ErrStoreEOF {
8583-
fs.mu.RLock()
8584-
defer fs.mu.RUnlock()
8585-
return nil, fs.state.FirstSeq, err
8586-
}
8587-
return nil, 0, err
8703+
return fs.LoadPrevMsg(_EMPTY_, false, start, smp)
85888704
}
85898705
fs.mu.RLock()
85908706
defer fs.mu.RUnlock()

server/memstore.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1907,31 +1907,41 @@ func (ms *memStore) loadNextMsgLocked(filter string, wc bool, start uint64, smp
19071907
return nil, ms.state.LastSeq, ErrStoreEOF
19081908
}
19091909

1910-
// Will load the next non-deleted msg starting at the start sequence and walking backwards.
1911-
func (ms *memStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) {
1910+
// Will load the previous message matching the filter subject, starting at the start sequence and walking backwards.
1911+
func (ms *memStore) LoadPrevMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error) {
19121912
ms.mu.RLock()
19131913
defer ms.mu.RUnlock()
19141914

19151915
if ms.msgs == nil {
1916-
return nil, ErrStoreClosed
1916+
return nil, 0, ErrStoreClosed
19171917
}
19181918
if ms.state.Msgs == 0 || start < ms.state.FirstSeq {
1919-
return nil, ErrStoreEOF
1919+
return nil, ms.state.FirstSeq, ErrStoreEOF
19201920
}
19211921
if start > ms.state.LastSeq {
19221922
start = ms.state.LastSeq
19231923
}
19241924

1925+
if filter == _EMPTY_ {
1926+
filter = fwcs
1927+
wc = true
1928+
}
1929+
isAll := filter == fwcs
1930+
eq := subjectsEqual
1931+
if wc {
1932+
eq = matchLiteral
1933+
}
1934+
19251935
for seq := start; seq >= ms.state.FirstSeq; seq-- {
1926-
if sm, ok := ms.msgs[seq]; ok {
1936+
if sm, ok := ms.msgs[seq]; ok && (isAll || eq(sm.subj, filter)) {
19271937
if smp == nil {
19281938
smp = new(StoreMsg)
19291939
}
19301940
sm.copy(smp)
1931-
return smp, nil
1941+
return smp, seq, nil
19321942
}
19331943
}
1934-
return nil, ErrStoreEOF
1944+
return nil, ms.state.FirstSeq, ErrStoreEOF
19351945
}
19361946

19371947
// LoadPrevMsgMulti will find the previous message matching any entry in the sublist.

server/norace_2_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2969,7 +2969,7 @@ func TestNoRaceStoreReverseWalkWithDeletesPerf(t *testing.T) {
29692969
seq, seen := ss.LastSeq, 0
29702970
start = time.Now()
29712971
for {
2972-
sm, err := store.LoadPrevMsg(seq, &smv)
2972+
sm, _, err := store.LoadPrevMsg(_EMPTY_, false, seq, &smv)
29732973
if err == ErrStoreEOF {
29742974
break
29752975
}

server/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ type StreamStore interface {
100100
LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
101101
LoadNextMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
102102
LoadLastMsg(subject string, sm *StoreMsg) (*StoreMsg, error)
103-
LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error)
103+
LoadPrevMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
104104
LoadPrevMsgMulti(sl *gsl.SimpleSublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
105105
RemoveMsg(seq uint64) (bool, error)
106106
EraseMsg(seq uint64) (bool, error)

server/store_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -786,6 +786,40 @@ func TestStoreMsgLoadPrevMsgMulti(t *testing.T) {
786786
)
787787
}
788788

789+
func TestStoreMsgLoadPrevMsg(t *testing.T) {
790+
testAllStoreAllPermutations(
791+
t, false,
792+
StreamConfig{Name: "zzz", Subjects: []string{"foo.*", "bar.*"}},
793+
func(t *testing.T, fs StreamStore) {
794+
for _, subj := range []string{"foo.1", "bar.1", "foo.2", "bar.2", "foo.3"} {
795+
_, _, err := fs.StoreMsg(subj, nil, []byte("ZZZ"), 0)
796+
require_NoError(t, err)
797+
}
798+
799+
var sm StoreMsg
800+
801+
smp, seq, err := fs.LoadPrevMsg(_EMPTY_, false, 5, &sm)
802+
require_NoError(t, err)
803+
require_Equal(t, smp.subj, "foo.3")
804+
require_Equal(t, seq, uint64(5))
805+
806+
smp, seq, err = fs.LoadPrevMsg("foo.2", false, 5, &sm)
807+
require_NoError(t, err)
808+
require_Equal(t, smp.subj, "foo.2")
809+
require_Equal(t, seq, uint64(3))
810+
811+
smp, seq, err = fs.LoadPrevMsg("foo.*", true, 5, &sm)
812+
require_NoError(t, err)
813+
require_Equal(t, smp.subj, "foo.3")
814+
require_Equal(t, seq, uint64(5))
815+
816+
_, seq, err = fs.LoadPrevMsg("baz.*", true, 5, &sm)
817+
require_Error(t, err, ErrStoreEOF)
818+
require_Equal(t, seq, uint64(1))
819+
},
820+
)
821+
}
822+
789823
func TestStoreMsgLoadPrevMsgMultiFullWildcardSkip(t *testing.T) {
790824
testAllStoreAllPermutations(
791825
t, false,

0 commit comments

Comments
 (0)