• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

go线程安全哈希表concurrent-map

武飞扬头像
guoguangwu
帮助1

github.com/orcaman/concurrent-map

这个哈希表是基于golang提供的map来实现线程安全的哈希表。map的key只能是string

下面给一个多线程操作的例子,该map的value为string

  1.  
    package main
  2.  
     
  3.  
    import (
  4.  
    "fmt"
  5.  
    "github.com/orcaman/concurrent-map/v2"
  6.  
    "sync"
  7.  
    )
  8.  
     
  9.  
    func main() {
  10.  
    var wg sync.WaitGroup
  11.  
    // Create a new map.
  12.  
    m := cmap.New[string]()
  13.  
     
  14.  
    wg.Add(1)
  15.  
    // Sets item within map, sets "bar" under key "foo"
  16.  
    go func() {
  17.  
    defer wg.Done()
  18.  
    m.Set("foo", "bar1111")
  19.  
     
  20.  
    m.Set("foo1", "bar11112")
  21.  
    m.Set("test1", "val1")
  22.  
    }()
  23.  
     
  24.  
    wg.Add(1)
  25.  
     
  26.  
    go func() {
  27.  
    defer wg.Done()
  28.  
    fmt.Println("====================")
  29.  
    m.Set("111", "2222")
  30.  
    for item := range m.IterBuffered() {
  31.  
    fmt.Println("==== kv :", item)
  32.  
    }
  33.  
    fmt.Println("====================")
  34.  
    } ()
  35.  
     
  36.  
    // Retrieve item from map.
  37.  
    go func () {
  38.  
    bar, ok := m.Get("foo")
  39.  
    fmt.Println("bar : ", bar, ", ok :", ok)
  40.  
    // Removes item under key "foo"
  41.  
    m.Remove("foo")
  42.  
    }()
  43.  
    wg.Add(1)
  44.  
     
  45.  
    go func() {
  46.  
    defer wg.Done()
  47.  
    m.Set("3333", "4444")
  48.  
     
  49.  
    fmt.Println("-----------------")
  50.  
    for item := range m.IterBuffered() {
  51.  
    fmt.Println("---- kv :", item)
  52.  
    }
  53.  
    fmt.Println("-----------------")
  54.  
     
  55.  
    }()
  56.  
     
  57.  
    wg.Wait()
  58.  
    }

运行结果:

  1.  
    ====================
  2.  
    bar : bar1111 , ok : true
  3.  
    -----------------
  4.  
    ==== kv : {3333 4444}
  5.  
    ---- kv : {3333 4444}
  6.  
    ---- kv : {111 2222}
  7.  
    ==== kv : {111 2222}
  8.  
    ---- kv : {foo1 bar11112}
  9.  
    ---- kv : {test1 val1}
  10.  
    ==== kv : {foo1 bar11112}
  11.  
    -----------------
  12.  
    ==== kv : {test1 val1}
  13.  
    ====================
  14.  
     
  15.  
    Process finished with the exit code 0
  1.  
    -----------------
  2.  
    bar : bar1111 , ok : true
  3.  
    ====================
  4.  
    ---- kv : {3333 4444}
  5.  
    ---- kv : {111 2222}
  6.  
    ---- kv : {foo1 bar11112}
  7.  
    ---- kv : {test1 val1}
  8.  
    -----------------
  9.  
    ==== kv : {3333 4444}
  10.  
    ==== kv : {111 2222}
  11.  
    ==== kv : {foo1 bar11112}
  12.  
    ==== kv : {test1 val1}
  13.  
    ====================
  14.  
     
  15.  
    Process finished with the exit code 0

运行结果比较随机。

下面是对该哈希表的源码分析:

数据结构:

  1.  
    /*
  2.  
    *多少个子哈希表,这个可以修改,如果想减少冲突,可以参考之前的博客中的github.com/tidwall/shardmap哈希表的计算方式,即根据实际的物理CPU核心数来计算
  3.  
    */
  4.  
    var SHARD_COUNT = 32
  5.  
     
  6.  
    // A "thread" safe map of type string:Anything.
  7.  
    // To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
  8.  
    type ConcurrentMap[V any] []*ConcurrentMapShared[V]//对外暴露的哈希结构,是ConcurrentMapShared的指针数组,其中V any的用法类似于c 的模板,传入的结构就是value的结构
  9.  
     
  10.  
    // A "thread" safe string to anything map.
  11.  
    type ConcurrentMapShared[V any] struct {
  12.  
    items map[string]V//key为string就是在这里决定的,这里使用的是golang自带的哈希表,减少重复造轮子
  13.  
    sync.RWMutex // Read Write mutex, guards access to internal map.//每个哈希表一个读写锁
  14.  
    }

创建哈希表cmap.New:

  1.  
    // Creates a new concurrent map.
  2.  
    func New[V any]() ConcurrentMap[V] {
  3.  
    m := make(ConcurrentMap[V], SHARD_COUNT)//创建哈希表结构
  4.  
    for i := 0; i < SHARD_COUNT; i {//循环创建子哈希表
  5.  
    m[i] = &ConcurrentMapShared[V]{items: make(map[string]V)}
  6.  
    }
  7.  
    return m
  8.  
    }

添加元素Set:

  1.  
    // Sets the given value under the specified key.
  2.  
    func (m ConcurrentMap[V]) Set(key string, value V) {
  3.  
    // Get map shard.
  4.  
    shard := m.GetShard(key)//根据传入的key,计算此key对应在哪个子哈希表中,
  5.  
    shard.Lock()//使用对应的锁进行加写锁
  6.  
    shard.items[key] = value//赋值(如果已存在,替换)
  7.  
    shard.Unlock()//解锁
  8.  
    }

哈希函数进行散列(fnv哈希算法):

  1.  
    // GetShard returns shard under given key
  2.  
    func (m ConcurrentMap[V]) GetShard(key string) *ConcurrentMapShared[V] {
  3.  
    return m[uint(fnv32(key))%uint(SHARD_COUNT)]
  4.  
    }
  5.  
     
  6.  
    func fnv32(key string) uint32 {
  7.  
    hash := uint32(2166136261)
  8.  
    const prime32 = uint32(16777619)
  9.  
    keyLength := len(key)
  10.  
    for i := 0; i < keyLength; i {
  11.  
    hash *= prime32
  12.  
    hash ^= uint32(key[i])
  13.  
    }
  14.  
    return hash
  15.  
    }
Get函数:
  1.  
    // Get retrieves an element from map under given key.
  2.  
    func (m ConcurrentMap[V]) Get(key string) (V, bool) {//基本逻辑和Set一样,不过因为是读操作,所以加读锁
  3.  
    // Get shard
  4.  
    shard := m.GetShard(key)
  5.  
    shard.RLock()
  6.  
    // Get item from shard.
  7.  
    val, ok := shard.items[key]
  8.  
    shard.RUnlock()
  9.  
    return val, ok
  10.  
    }

代码中有个遍历哈希表操作

for item := range m.IterBuffered() {......}

具体代码实现如下:

  1.  
    // Used by the Iter & IterBuffered functions to wrap two variables together over a channel,
  2.  
    type Tuple[V any] struct {//键值对结构
  3.  
    Key string
  4.  
    Val V
  5.  
    }
  6.  
     
  7.  
     
  8.  
    ......
  9.  
    // IterBuffered returns a buffered iterator which could be used in a for range loop.
  10.  
    func (m ConcurrentMap[V]) IterBuffered() <-chan Tuple[V] {
  11.  
    chans := snapshot(m)//获取快照数据,其实这里就是将所有子哈希表当前数据的键值对信息,后面增加或者删除了元素不影响,
  12.  
    total := 0
  13.  
    for _, c := range chans {//遍历chan数组个数
  14.  
    total = cap(c)//每个元素都是一个一维数组
  15.  
    }
  16.  
    ch := make(chan Tuple[V], total)//根据计算键值对的个数分配数组
  17.  
    go fanIn(chans, ch)//并发去将chans的内容写入ch中,这里为什么不直接用chans,因为chans是二维数组,这里的操作是将二维数组转换成一维数组
  18.  
    return ch
  19.  
    }

我们看一下快照函数的实现:

  1.  
    // Returns a array of channels that contains elements in each shard,
  2.  
    // which likely takes a snapshot of `m`.
  3.  
    // It returns once the size of each buffered channel is determined,
  4.  
    // before all the channels are populated using goroutines.
  5.  
    func snapshot[V any](m ConcurrentMap[V]) (chans []chan Tuple[V]) {
  6.  
    //When you access map items before initializing.
  7.  
    if len(m) == 0 {
  8.  
    panic(`cmap.ConcurrentMap is not initialized. Should run New() before usage.`)
  9.  
    }
  10.  
    chans = make([]chan Tuple[V], SHARD_COUNT)//根据实际的子哈希表个数分配chan 数组
  11.  
    wg := sync.WaitGroup{}
  12.  
    wg.Add(SHARD_COUNT)//设置等待goroutine个数,因为我们需要对每个子表进行遍历操作,所以还是SHARD_COUNT
  13.  
    // Foreach shard.
  14.  
    for index, shard := range m {//遍历子表
  15.  
    go func(index int, shard *ConcurrentMapShared[V]) {//启动goroutine进行操作,提升性能
  16.  
    // Foreach key, value pair.
  17.  
    shard.RLock()//使用对应的锁加读锁
  18.  
    chans[index] = make(chan Tuple[V], len(shard.items))
  19.  
    wg.Done()
  20.  
    for key, val := range shard.items {//遍历子表,将对应键值对复制到对应的chans的子数组中
  21.  
    chans[index] <- Tuple[V]{key, val}
  22.  
    }
  23.  
    shard.RUnlock()
  24.  
    close(chans[index])
  25.  
    }(index, shard)
  26.  
    }
  27.  
    wg.Wait()//这里会等待所有子表都遍历完,才会返回,所以如果散列的不够均匀,等待时间会长
  28.  
    return chans
  29.  
    }

我们来看将二维数组聚合成一维数组的过程fanIn:

  1.  
    // fanIn reads elements from channels `chans` into channel `out`
  2.  
    func fanIn[V any](chans []chan Tuple[V], out chan Tuple[V]) {
  3.  
    wg := sync.WaitGroup{}
  4.  
    wg.Add(len(chans))
  5.  
    for _, ch := range chans {//遍历每一个chan,一个子表一个chan
  6.  
    go func(ch chan Tuple[V]) {
  7.  
    for t := range ch {//遍历子表的键值对
  8.  
    out <- t//全部赋值到out的一维数组中
  9.  
    }
  10.  
    wg.Done()
  11.  
    }(ch)
  12.  
    }
  13.  
    wg.Wait()
  14.  
    close(out)
  15.  
    }

其实之前还有一个老的Iter版本,注释说性能比较差。我们看看它的实现:

  1.  
    // Iter returns an iterator which could be used in a for range loop.
  2.  
    //
  3.  
    // Deprecated: using IterBuffered() will get a better performence
  4.  
    func (m ConcurrentMap[V]) Iter() <-chan Tuple[V] {
  5.  
    chans := snapshot(m)
  6.  
    ch := make(chan Tuple[V])
  7.  
    go fanIn(chans, ch)
  8.  
    return ch
  9.  
    }

唯一的区别就是没有指定ch的的大小。chan内部实现是一个环形队列,如果队列不够大,会等待处理,性能差。

例子中还用到了Remove接口,其实和set大同小异,底层实现使用的是系统自带的delete函数。其它的函数后面用到了再来追加,基本逻辑都是一样。

备注:

里面有两个函数比较好一个是MSet,这个用于将一个map数据导入到这个并发安全的map中。

第二个是Upsert,这个函数用于修改或添加键值对,提供了UpsertCb回调函数,也就是说,可以提供自己的逻辑来处理数据,比如在历史数据上面进行修改,比较方便。

下面是一部分测试代码:

  1.  
    ......//这是上面的测试用例代码,我们只需要在main函数中调用下UpdateTest()就可以了。
  2.  
    func UpdateTest() {
  3.  
    m := cmap.New[int64]()
  4.  
    m.Set("test", 1)
  5.  
    m.Upsert("test", 3, UpsertCb)
  6.  
    v, ok :=m.Get("test")
  7.  
    if ok {
  8.  
    fmt.Println(v)
  9.  
    }
  10.  
    }
  11.  
     
  12.  
    func UpsertCb (exist bool, valueInMap int64, newValue int64) int64{
  13.  
    if exist {
  14.  
    return valueInMap newValue
  15.  
    } else {
  16.  
    return newValue
  17.  
    }
  18.  
    }

运行结果如下:

  1.  
    ====================
  2.  
    bar : bar1111 , ok : true
  3.  
    -----------------
  4.  
    ==== kv : {foo1 bar11112}
  5.  
    ---- kv : {foo1 bar11112}
  6.  
    ==== kv : {3333 4444}
  7.  
    ---- kv : {3333 4444}
  8.  
    ==== kv : {111 2222}
  9.  
    ==== kv : {test1 val1}
  10.  
    ====================
  11.  
    ---- kv : {111 2222}
  12.  
    ---- kv : {test1 val1}
  13.  
    -----------------
  14.  
    4
  15.  
     
  16.  
    Process finished with the exit code 0

运行结果为4,我们的测试代码就是将新旧值相互累加后返回,如果存在旧值的话。这里就能看到它比sync.Map好的地方,这样就能进行临界控制来修改旧值。

同理RemoveCb也是一样的道理。这里就不一个一个地分析了。

这里还有个地方需要注意就是Keys函数,它在刚进入函数的时候就获取该哈希表的总的个数,然后分配了一个一维数组chan,个数刚好就是此时哈希表的总个数,然后循环遍历哈希表将所有的key,最后转换成一个一维数组。需要考虑一个问题,如果此时往该哈希表中添加一个元素会不会出问题?程序出来这么久,应该不会问题,整个哈希表遍历完,就调用close channel,就算数据少了也会直接返回,不会有问题,如果多了,channel也是能获取到新增的数据。

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfiffbb
系列文章
更多 icon
同类精品
更多 icon
继续加载