• 欢迎访问web前端中文站,JavaScript,CSS3,HTML5,web前端demo
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏web前端中文站吧

ConcurrentDictionary并发字典知多少?

JavaScript web前端中文站 6个月前 (03-19) 919次浏览 已收录 0个评论

背景

在上一篇文章你真的了解字典吗?一文中我介绍了 Hash Function 和字典的工作的基本原理.
有网友在文章底部评论,说我的 Remove 和 Add 方法没有考虑线程安全问题.
https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.dictionary-2?redirectedfrom=MSDN&view=netframework-4.7.2
查阅相关资料后,发现字典.net 中 Dictionary 本身时不支持线程安全的,如果要想使用支持线程安全的字典,那么我们就要使用 ConcurrentDictionary 了.
在研究 ConcurrentDictionary 的源码后,我觉得在 ConcurrentDictionary 的线程安全的解决思路很有意思,其对线程安全的处理对对我们项目中的其他高并发场景也有一定的参考价值,在这里再次分享我的一些学习心得和体会,希望对大家有所帮助.

更多精彩内容请看 web 前端中文站
http://www.lisa33xiaoq.net 可按 Ctrl + D 进行收藏

Concurrent

ConcurrentDictionary 是 Dictionary 的线程安全版本,位于 System.Collections.Concurrent 的命名空间下,该命名空间下除了有 ConcurrentDictionary,还有以下 Class 都是我们常用的那些类库的线程安全版本.

BlockingCollection:为实现 IProducerConsumerCollection 的线程安全集合提供阻塞和限制功能。

ConcurrentBag:表示对象的线程安全的无序集合.

ConcurrentQueue:表示线程安全的先进先出 (FIFO) 集合。

如果读过我上一篇文章你真的了解字典吗?的小伙伴,对这个ConcurrentDictionary的工作原理应该也不难理解,它是简简单单地在读写方法加个lock吗?

工作原理

Dictionary

如下图所示,在字典中,数组 entries 用来存储数据,buckets 作为桥梁,每次通过 hash function 获取了 key 的哈希值后,对这个哈希值进行取余,即hashResult%bucketsLength=bucketIndex,余数作为 buckets 的 index,而 buckets 的 value 就是这个 key 对应的 entry 所在 entries 中的索引,所以最终我们就可以通过这个索引在 entries 中拿到我们想要的数据,整个过程不需要对所有数据进行遍历,的时间复杂度为 1.

ConcurrentDictionary 并发字典知多少?

ConcurrentDictionary

ConcurrentDictionary 的数据存储类似,只是 buckets 有个更多的职责,它除了有 dictionary 中的 buckets 的桥梁的作用外,负责了数据存储.

ConcurrentDictionary 并发字典知多少?

key 的哈希值与 buckets 的 length 取余后hashResult%bucketsLength=bucketIndex,余数作为 buckets 的索引就能找到我们要的数据所存储的块,当出现两个 key 指向同一个块时,即上图中的 John Smith 和 Sandra Dee 他同时指向 152 怎么办呢?存储节点 Node 具有 Next 属性执行下个 Node,上图中,node 152 的 Next 为 154,即我们从 152 开始找 Sandra Dee,发现不是我们想要的,再到 154 找,即可取到所需数据.

由于官方原版的源码较为复杂,理解起来有所难度,我对官方源码做了一些精简,下文将围绕这个精简版的 ConcurrentDictionary 展开叙述.
https://github.com/liuzhenyulive/DictionaryMini

数据结构

Node

ConcurrentDictionary 中的每个数据存储在一个 Node 中,它除了存储 value 信息,还存储 key 信息,以及 key 对应的 hashcode

private class Node         {             internal TKey m_key;   //数据的 key             internal TValue m_value;  //数据值             internal volatile Node m_next;  //当前 Node 的下级节点             internal int m_hashcode;  //key 的 hashcode              //构造函数             internal Node(TKey key, TValue value, int hashcode, Node next)             {                 m_key = key;                 m_value = value;                 m_next = next;                 m_hashcode = hashcode;             }         }

Table

而整个 ConcurrentDictionary 的数据存储在这样的一个 Table 中,其中 m_buckets 的 Index 负责映射 key,m_locks 是线程锁,下文中会有详细介绍,m_countPerLock 存储每个 lock 锁负责的 node 数量.

  private class Tables         {             internal readonly Node[] m_buckets;   //上文中提到的 buckets             internal readonly object[] m_locks;   //线程锁             internal volatile int[] m_countPerLock;  //索格锁所管理的数据数量             internal readonly IEqualityComparer<TKey> m_comparer;  //当前 key 对应的 type 的比较器              //构造函数             internal Tables(Node[] buckets, object[] locks, int[] countPerlock, IEqualityComparer<TKey> comparer)             {                 m_buckets = buckets;                 m_locks = locks;                 m_countPerLock = countPerlock;                 m_comparer = comparer;             }         } 

ConcurrentDictionary 会在构造函数中创建 Table,这里我对原有的构造函数进行了简化,通过默认值进行创建,其中 DefaultConcurrencyLevel 默认并发级别为当前计算机处理器的线程数.

        //构造函数         public ConcurrentDictionaryMini() : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true,             EqualityComparer<TKey>.Default)         {         }          /// <summary>         ///         /// </summary>         /// <param name="concurrencyLevel">并发等级,默认为 CPU 的线程数</param>         /// <param name="capacity">默认容量,31,超过 31 后会自动扩容</param>         /// <param name="growLockArray">时否动态扩充锁的数量</param>         /// <param name="comparer">key 的比较器</param>         internal ConcurrentDictionaryMini(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)         {             if (concurrencyLevel < 1)             {                 throw new Exception("concurrencyLevel 必须为正数");             }              if (capacity < 0)             {                 throw new Exception("capacity 不能为负数.");             }              if (capacity < concurrencyLevel)             {                 capacity = concurrencyLevel;             }              object[] locks = new object[concurrencyLevel];             for (int i = 0; i < locks.Length; i++)             {                 locks[i] = new object();             }              int[] countPerLock = new int[locks.Length];             Node[] buckets = new Node[capacity];             m_tables = new Tables(buckets, locks, countPerLock, comparer);              m_growLockArray = growLockArray;             m_budget = buckets.Length / locks.Length;         } 

方法

ConcurrentDictionary 中较为基础重点的方法分别位 Add,Get,Remove,Grow Table 方法,其他方法基本上是建立在这四个方法的基础上进行的扩充.

Add

向 Table 中添加元素有以下亮点值得我们关注.

  • 开始操作前会声明一个 tables 变量来存储操作开始前的 m_tables,在正式开始操作后(进入 lock)的时候,会检查 tables 在准备工作阶段是否别的线程改变,如果改变了,则重新开始准备工作并从新开始.

  • 通过 GetBucketAndLockNo 方法获取 bucket 索引以及 lock 索引,其内部就是取余操作.

 private void GetBucketAndLockNo(             int hashcode, out int bucketNo, out int lockNo, int bucketCount, int lockCount)         {             //0x7FFFFFFF 是 long int 的最大值 与它按位与数据小于等于这个最大值             bucketNo = (hashcode & 0x7fffffff) % bucketCount;             lockNo = bucketNo % lockCount;         }
  • 对数据进行操作前会从 m_locks 取出第 lockNo 个对象最为 lock,操作完成后释放该 lock.多个 lock 一定程度上减少了阻塞的可能性.

  • 在对数据进行更新时,如果该 Value 的 Type 为允许原子性写入的,则直接更新该 Value,否则创建一个新的 node 进行覆盖.

        /// <summary>         /// Determines whether type TValue can be written atomically         /// </summary>         private static bool IsValueWriteAtomic()         {             Type valueType = typeof(TValue);              //             // Section 12.6.6 of ECMA CLI explains which types can be read and written atomically without             // the risk of tearing.             //             // See http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf             //             if (valueType.IsClass)             {                 return true;             }             switch (Type.GetTypeCode(valueType))             {                 case TypeCode.Boolean:                 case TypeCode.Byte:                 case TypeCode.Char:                 case TypeCode.Int16:                 case TypeCode.Int32:                 case TypeCode.SByte:                 case TypeCode.Single:                 case TypeCode.UInt16:                 case TypeCode.UInt32:                     return true;                  case TypeCode.Int64:                 case TypeCode.Double:                 case TypeCode.UInt64:                     return IntPtr.Size == 8;                  default:                     return false;             }         }

该方法依据 CLI 规范进行编写,简单来说,32 位的计算机,对 32 字节以下的数据类型写入时可以一次写入的而不需要移动内存指针,64 位计算机对 64 位以下的数据可一次性写入,不需要移动内存指针.保证了写入的安全.
详见 12.6.6 http://www.ecma-international.org/publications/files/ECMA-ST/ECMA-335.pdf

  private bool TryAddInternal(TKey key, TValue value, bool updateIfExists, bool acquireLock, out TValue resultingValue)         {             while (true)             {                 int bucketNo, lockNo;                 int hashcode;                  //https://www.cnblogs.com/blurhkh/p/10357576.html                 //需要了解一下值传递和引用传递                 Tables tables = m_tables;                 IEqualityComparer<TKey> comparer = tables.m_comparer;                 hashcode = comparer.GetHashCode(key);                  GetBucketAndLockNo(hashcode, out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);                  bool resizeDesired = false;                 bool lockTaken = false;                  try                 {                     if (acquireLock)                         Monitor.Enter(tables.m_locks[lockNo], ref lockTaken);                      //如果表刚刚调整了大小,我们可能没有持有正确的锁,必须重试。                     //当然这种情况很少见                     if (tables != m_tables)                         continue;                      Node prev = null;                     for (Node node = tables.m_buckets[bucketNo]; node != null; node = node.m_next)                     {                         if (comparer.Equals(node.m_key, key))                         {                             //key 在字典里找到了。如果允许更新,则更新该 key 的值。                             //我们需要为更新创建一个 node,以支持不能以原子方式写入的 TValue 类型,因为 free-lock 读取可能同时发生。                             if (updateIfExists)                             {                                 if (s_isValueWriteAtomic)                                 {                                     node.m_value = value;                                 }                                 else                                 {                                     Node newNode = new Node(node.m_key, value, hashcode, node.m_next);                                     if (prev == null)                                     {                                         tables.m_buckets[bucketNo] = newNode;                                     }                                     else                                     {                                         prev.m_next = newNode;                                     }                                 }                                  resultingValue = value;                             }                             else                             {                                 resultingValue = node.m_value;                             }                              return false;                         }                          prev = node;                     }                      //key 没有在 bucket 中找到,则插入该数据                     Volatile.Write(ref tables.m_buckets[bucketNo], new Node(key, value, hashcode, tables.m_buckets[bucketNo]));                     //当 m_countPerLock 超过 Int Max 时会抛出 OverflowException                     checked                     {                         tables.m_countPerLock[lockNo]++;                     }                      //                     // 如果 m_countPerLock[lockNo] > m_budget,则需要调整 buckets 的大小。                     // GrowTable 也可能会增加 m_budget,但不会调整 bucket table 的大小。.                     // 如果发现 bucket table 利用率很低,也会发生这种情况。                     //                     if (tables.m_countPerLock[lockNo] > m_budget)                     {                         resizeDesired = true;                     }                 }                 finally                 {                     if (lockTaken)                         Monitor.Exit(tables.m_locks[lockNo]);                 }                  if (resizeDesired)                 {                     GrowTable(tables, tables.m_comparer, false, m_keyRehashCount);                 }                  resultingValue = value;                 return true;             }         } 

Get

从 Table 中获取元素的的流程与前文介绍 ConcurrentDictionary 工作原理时一致,但有以下亮点值得关注.

  • 读取 bucket[i]在 Volatile.Read()方法中进行,该方法会自动对读取出来的数据加锁,避免在读取的过程中,数据被其他线程 remove 了.
  • Volatile 读取指定字段时,在读取的内存中插入一个内存屏障,阻止处理器重新排序内存操作,如果在代码中此方法之后出现读取或写入,则处理器无法在此方法之前移动它。
  public bool TryGetValue(TKey key, out TValue value)         {             if (key == null) throw new ArgumentNullException("key");              // We must capture the m_buckets field in a local variable. It is set to a new table on each table resize.             Tables tables = m_tables;             IEqualityComparer<TKey> comparer = tables.m_comparer;             GetBucketAndLockNo(comparer.GetHashCode(key), out var bucketNo, out _, tables.m_buckets.Length, tables.m_locks.Length);              // We can get away w/out a lock here.             // The Volatile.Read ensures that the load of the fields of 'n' doesn't move before the load from buckets[i].             Node n = Volatile.Read(ref tables.m_buckets[bucketNo]);              while (n != null)             {                 if (comparer.Equals(n.m_key, key))                 {                     value = n.m_value;                     return true;                 }                 n = n.m_next;             }              value = default(TValue);             return false;         } 

Remove

Remove 方法实现其实也并不复杂,类似我们链表操作中移除某个 Node.移除节点的同时,还要对前后节点进行链接,相信一块小伙伴们肯定很好理解.

 private bool TryRemoveInternal(TKey key, out TValue value, bool matchValue, TValue oldValue)         {             while (true)             {                 Tables tables = m_tables;                  IEqualityComparer<TKey> comparer = tables.m_comparer;                  int bucketNo, lockNo;                  GetBucketAndLockNo(comparer.GetHashCode(key), out bucketNo, out lockNo, tables.m_buckets.Length, tables.m_locks.Length);                  lock (tables.m_locks[lockNo])                 {                     if (tables != m_tables)                         continue;                      Node prev = null;                     for (Node curr = tables.m_buckets[bucketNo]; curr != null; curr = curr.m_next)                     {                         if (comparer.Equals(curr.m_key, key))                         {                             if (matchValue)                             {                                 bool valuesMatch = EqualityComparer<TValue>.Default.Equals(oldValue, curr.m_value);                                 if (!valuesMatch)                                 {                                     value = default(TValue);                                     return false;                                 }                             }                             if (prev == null)                                 Volatile.Write(ref tables.m_buckets[bucketNo], curr.m_next);                             else                             {                                 prev.m_next = curr.m_next;                             }                              value = curr.m_value;                             tables.m_countPerLock[lockNo]--;                             return true;                         }                          prev = curr;                     }                 }                  value = default(TValue);                 return false;             }         }

Grow table

当 table 中任何一个 m_countPerLock 的数量超过了设定的阈值后,会触发此操作对 Table 进行扩容.

private void GrowTable(Tables tables, IEqualityComparer<TKey> newComparer, bool regenerateHashKeys,             int rehashCount)         {             int locksAcquired = 0;             try             {                 //首先锁住第一个 lock 进行 resize 操作.                 AcquireLocks(0, 1, ref locksAcquired);                  if (regenerateHashKeys && rehashCount == m_keyRehashCount)                 {                     tables = m_tables;                 }                 else                 {                     if (tables != m_tables)                         return;                      long approxCount = 0;                     for (int i = 0; i < tables.m_countPerLock.Length; i++)                     {                         approxCount += tables.m_countPerLock[i];                     }                      //如果 bucket 数组太空,则将预算加倍,而不是调整表的大小                     if (approxCount < tables.m_buckets.Length / 4)                     {                         m_budget = 2 * m_budget;                         if (m_budget < 0)                         {                             m_budget = int.MaxValue;                         }                          return;                     }                 }                  int newLength = 0;                 bool maximizeTableSize = false;                 try                 {                     checked                     {                         newLength = tables.m_buckets.Length * 2 + 1;                         while (newLength % 3 == 0 || newLength % 5 == 0 || newLength % 7 == 0)                         {                             newLength += 2;                         }                     }                 }                 catch (OverflowException)                 {                     maximizeTableSize = true;                 }                  if (maximizeTableSize)                 {                     newLength = int.MaxValue;                      m_budget = int.MaxValue;                 }                  AcquireLocks(1, tables.m_locks.Length, ref locksAcquired);                  object[] newLocks = tables.m_locks;                  //Add more locks                 if (m_growLockArray && tables.m_locks.Length < MAX_LOCK_NUMBER)                 {                     newLocks = new object[tables.m_locks.Length * 2];                     Array.Copy(tables.m_locks, newLocks, tables.m_locks.Length);                      for (int i = tables.m_locks.Length; i < newLocks.Length; i++)                     {                         newLocks[i] = new object();                     }                 }                  Node[] newBuckets = new Node[newLength];                 int[] newCountPerLock = new int[newLocks.Length];                  for (int i = 0; i < tables.m_buckets.Length; i++)                 {                     Node current = tables.m_buckets[i];                     while (current != null)                     {                         Node next = current.m_next;                         int newBucketNo, newLockNo;                         int nodeHashCode = current.m_hashcode;                          if (regenerateHashKeys)                         {                             //Recompute the hash from the key                             nodeHashCode = newComparer.GetHashCode(current.m_key);                         }                          GetBucketAndLockNo(nodeHashCode, out newBucketNo, out newLockNo, newBuckets.Length,                             newLocks.Length);                          newBuckets[newBucketNo] = new Node(current.m_key, current.m_value, nodeHashCode,                             newBuckets[newBucketNo]);                         checked                         {                             newCountPerLock[newLockNo]++;                         }                          current = next;                     }                 }                  if (regenerateHashKeys)                 {                     unchecked                     {                         m_keyRehashCount++;                     }                 }                  m_budget = Math.Max(1, newBuckets.Length / newLocks.Length);                  m_tables = new Tables(newBuckets, newLocks, newCountPerLock, newComparer);             }             finally             {                 ReleaseLocks(0, locksAcquired);             }         } 

学习感悟

  • lock[]:在以往的线程安全上,我们对数据的保护往往是对数据的修改写入等地方加上 lock,这个 lock 经常上整个上下文中唯一的,这样的设计下就可能会出现多个线程,写入的根本不是一块数据,却要等待前一个线程写入完成下一个线程才能继续操作.在 ConcurrentDictionary 中,通过哈希算法,从数组lock[]中找出 key 的准确 lock,如果不同的 key,使用的不是同一个 lock,那么这多个线程的写入时互不影响的.

  • 写入要考虑线程安全,读取呢?不可否认,在大部分场景下,读取不必去考虑线程安全,但是在我们这样的链式读取中,需要自上而下地查找,是不是有种可能在查找个过程中,链路被修改了呢?所以 ConcurrentDictionary 中使用 Volatile.Read 来读取出数据,该方法从指定字段读取对象引用,在需要它的系统上,插入一个内存屏障,阻止处理器重新排序内存操作,如果在代码中此方法之后出现读取或写入,则处理器无法在此方法之前移动它。

  • 在 ConcurrentDictionary 的更新方法中,对数据进行更新时,会判断该数据是否可以原子写入,如果时可以原子写入的,那么就直接更新数据,如果不是,那么会创建一个新的 node 覆盖原有 node,起初看到这里时候,我百思不得其解,不知道这么操作的目的,后面在 jeo duffy 的博客中 Thread-safety, torn reads, and the like 中找到了答案,这样操作时为了防止 torn reads(撕裂读取),什么叫撕裂读取呢?通俗地说,就是有的数据类型写入时,要分多次写入,写一次,移动一次指针,那么就有可能写了一半,这个结果被另外一个线程读取走了.比如说我把 刘振宇三个字改成周杰伦的过程中,我先改把刘改成周了,正在我准备去把振改成杰的时候,另外一个线程过来读取结果了,读到的数据是周振宇,这显然是不对的.所以对这种,更安全的做法是先把周杰伦三个字写好在一张纸条上,然后直接替换掉刘振宇.更多信息在 CLI 规范 12.6.6 有详细介绍.

  • checkedunckecked关键字.非常量的运算(non-constant)运算在编译阶段和运行时下不会做溢出检查,如下这样的代码时不会抛出异常的,算错了也不会报错。

int ten = 10; int i2 = 2147483647 + ten;

但是我们知道,int 的最大值是 2147483647,如果我们将上面这样的代码嵌套在checked就会做溢出检查了.

checked { int ten = 10; int i2 = 2147483647 + ten; }

相反,对于常量,编译时是会做溢出检查的,下面这样的代码在编译时就会报错的,如果我们使用unckeck标签进行标记,则在编译阶段不会做移除检查.

int a = int.MaxValue * 2;

那么问题来了,我们当然知道 checked 很有用,那么 uncheck 呢?如果我们只是需要那么一个数而已,至于溢出不溢出的关系不大,比如说生成一个对象的 HashCode,比如说根据一个算法计算出一个相对随机数,这都是不需要准确结果的,ConcurrentDictionary 中对于m_keyRehashCount++这个运算就使用了 unchecked,就是因为 m_keyRehashCount 是用来生成哈希值的,我们并不关心它有没有溢出.

  • volatile关键字,表示一个字段可能是由在同一时间执行多个线程进行修改。出于性能原因,编译器/运行时系统甚至硬件可以重新排列对存储器位置的读取和写入。声明的字段 volatile 不受这些优化的约束。添加 volatile 修饰符可确保所有线程都能按照执行顺序由任何其他线程执行的易失性写入,易失性写入是一件疯狂的事情的事情:普通玩家慎用.

本博客所涉及的代码都保存在 github 中,Take it easy to enjoy it!
https://github.com/liuzhenyulive/DictionaryMini/blob/master/DictionaryMini/DictionaryMini/ConcurrentDictionaryMini.cs

【注:本文源自网络文章资源,由站长整理发布】


web 前端中文站 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:ConcurrentDictionary 并发字典知多少?
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址