Skip to content

阻塞读且并发安全的map问题 #76

@cxlearning

Description

@cxlearning

原代码存在的问题:

  1. Out:若存在item且item.isExist=false时,没有将其置为true
  2. Rd:若存在两个协程a、b同时读相同的key且都阻塞在m.rmx.Lock(),可能会存在a刚放入m的item会被b覆盖

原版

type sp interface {
	Out(key string, val interface{})                  //存入key /val,如果该key读取的goroutine挂起,则唤醒。此方法不会阻塞,时刻都可以立即执行并返回
	Rd(key string, timeout time.Duration) interface{} //读取一个key,如果key不存在阻塞,等待key存在或者超时
}

type Map struct {
	c   map[string]*entry
	rmx *sync.RWMutex
}
type entry struct {
	ch      chan struct{}
	value   interface{}
	isExist bool
}

func (m *Map) Out(key string, val interface{}) {
	m.rmx.Lock()
	defer m.rmx.Unlock()
	item, ok := m.c[key]
	if !ok {
		m.c[key] = &entry{
			value:   val,
			isExist: true,
		}
		return
	}
	item.value = val
	if !item.isExist {
		if item.ch != nil {
			close(item.ch)
			item.ch = nil
		}
	}
	return
}

func (m *Map) Rd(key string, timeout time.Duration) interface{} {
	m.rmx.RLock()
	if e, ok := m.c[key]; ok && e.isExist {
		m.rmx.RUnlock()
		return e.value
	} else if !ok {
		m.rmx.RUnlock()
		m.rmx.Lock()
		e = &entry{ch: make(chan struct{}), isExist: false}
		m.c[key] = e
		m.rmx.Unlock()
		log.Println("协程阻塞 -> ", key)
		select {
		case <-e.ch:
			return e.value
		case <-time.After(timeout):
			log.Println("协程超时 -> ", key)
			return nil
		}
	} else {
		m.rmx.RUnlock()
		log.Println("协程阻塞 -> ", key)
		select {
		case <-e.ch:
			return e.value
		case <-time.After(timeout):
			log.Println("协程超时 -> ", key)
			return nil
		}
	}
}

func main() {
	mapval := Map{
		c:   make(map[string]*entry),
		rmx: &sync.RWMutex{},
	}

	for i := 0; i < 10; i++ {
		go func() {
			val := mapval.Rd("key", time.Second*6)
			log.Println("读取值为->", val)
		}()
	}

	time.Sleep(time.Second * 3)
	for i := 0; i < 10; i++ {
		go func(val int) {
			mapval.Out("key", val)
		}(i)
	}

	time.Sleep(time.Second * 30)
}
func init() {
	log.SetFlags(log.LstdFlags | log.Lshortfile)
}

改进

type sp interface {
	Out(key string, val interface{})                  //存入key /val,如果该key读取的goroutine挂起,则唤醒。此方法不会阻塞,时刻都可以立即执行并返回
	Rd(key string, timeout time.Duration) interface{} //读取一个key,如果key不存在阻塞,等待key存在或者超时
}

type xMap struct {
	c  map[string]*entry
	mx *sync.Mutex
}
type entry struct {
	ch      chan struct{}
	value   interface{}
	isExist bool
}

func (x *xMap) Out(key string, val interface{}) {
	x.mx.Lock()
	defer x.mx.Unlock()

	if e, ok := x.c[key]; ok {
		exist := e.isExist
		e.value = val
		e.isExist = true

		if !exist && e.ch != nil {
			close(e.ch)
			e.ch = nil
		}
		return
	}

	e := &entry{
		value:   val,
		isExist: true,
		ch:      nil,
	}
	x.c[key] = e
}

func (x *xMap) Rd(key string, timeout time.Duration) interface{} {
	x.mx.Lock()
	defer x.mx.Unlock()

	if e, ok := x.c[key]; ok {
		if e.isExist {
			return e.value
		}

		select {
		case <-e.ch:
			return e.value
		case <-time.After(timeout):
			return nil
		}
	}

	e := &entry{
		value:   nil,
		isExist: false,
		ch:      make(chan struct{}),
	}
	x.c[key] = e
	select {
	case <-e.ch:
		return e.value
	case <-time.After(timeout):
		return nil
	}
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions