热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

白话算法(6.4)能让Dictionary比Hashtable慢600倍?——泛型Dictionary源码剖析(上)

泛型Dictionary和Hashtable都是散列表的一种实现。只不过Hashtable使用的是双重散列法(开放寻址法的一种),而Dictionary使用的是除法散列法+链接法处理碰

泛型 Dictionary 和 Hashtable 都是散列表的一种实现。只不过 Hashtable 使用的是双重散列法(开放寻址法的一种),而 Dictionary 使用的是除法散列法+链接法处理碰撞。知道了这一点,就可以写一个让 Dictionary 出丑的测试:

View Code
int SIZE = 10103;
IDictionary<int, int> dict = new Dictionary<int, int>(SIZE);
Hashtable h = new Hashtable(SIZE);

for (int i = 0; i <10000; i++)
{
dict.Add(i*SIZE, i);
h.Add(i*SIZE, i);
}

Stopwatch t = new Stopwatch();

t.Reset();
t.Start();
// 为了让测试效果明显,重复10万次查询
for (int i = 0; i <100000; i++)
{
dict.ContainsKey(SIZE);
}
t.Stop();
Console.WriteLine("Dictionary:" + t.Elapsed.TotalMilliseconds.ToString() + "毫秒");

t.Reset();
t.Start();
// 为了让测试效果明显,重复10万次查询
for (int i = 0; i <100000; i++)
{
h.ContainsKey(SIZE);
}
t.Stop();
Console.WriteLine("Hashtable:" + t.Elapsed.TotalMilliseconds.ToString() + "毫秒");

怎么样,效果很惊人吧?当然,计算倍数并没有什么实际意义,只不过听起来很恐怖,比较容易吸引眼球罢了。而且,这个测试对 Dictionary 是不公平的,毕竟是我故意制造了一个让 Dictionary 产生1万次碰撞的输入。一般情况下,Dictionary 和 Hashtable 的效率是不相上下的,在 Key 是原生类型的情况下 Dictionary 还略快一些。不过,这个测试也说明开放寻址法自有其优点,毕竟我们难以制造一个能让 Hashtable 出这么大的丑的测试。
  上面那个测试让人心中不爽,但是在实际使用时一般不会有太大问题。当容量比较小时,使用除法散列法确实容易产生碰撞,但是就算达到极端的 O(n) 查找又如何——把百八十个项全部遍历一遍也用不了多少时间。当容量较大时,例如容量是 10103 时,要每隔 10103 个数字才发生一次碰撞,如果可以假设实际添加到字典中的项都是比较靠近的,就不会发生大量碰撞。
  在阅读 Dictionary 源代码之前,我们先来做个独立思考,如何把上一篇使用双重散列法实现的 HashSet4 改造成泛型字典?

HashSet4 到泛型字典

  泛型字典在功能上与 Hashtable 的不同之处主要有2点:1)它的 Key 和 Value 都是泛型的;2)遍历 Key 得到的序列与添加时的顺序一致。
  先看第一点:泛型。如果把 HashSet4 的 Key 的类型由 Object 改成泛型参数 TKey,就无法把 null 和 _bucket 赋值给 Key 了,这两个特殊的值分别表示槽是空槽和标记为删除的槽,所以要为 Bucket 增加一个状态属性,并修改 MarkDeleted() 和 IsEmputyOrDeleted() 函数:

View Code
public class HashSet4
{
[DebuggerDisplay("Key = {Key} k = {k} IsCollided = {IsCollided}")]
private struct Bucket
{
public TKey Key;
private int _k; // Store hash code; sign bit means there was a collision.
public BucketState State;
public int k
{
get { return _k & 0x7FFFFFFF; } // 返回去掉最高的碰撞位之后的 _k
set
{
_k &= unchecked((int)0x80000000); // 将 _k 除了最高的碰撞位之外的其它位全部设为0
_k |= value; // 保持 _k 的最高的碰撞位不变,将 value 的值放到 _k 的后面几位中去
}
}
// 是否发生过碰撞
public bool IsCollided
{
get { return (_k & unchecked((int)0x80000000)) != 0; } // _k 的最高位如果为1表示发生过碰撞
}
// 将槽标记为发生过碰撞
public void MarkCollided()
{
_k |= unchecked((int)0x80000000); // 将 _k 的最高位设为1
}

public void MarkDeleted()
{
State = BucketState.Deleted;
k = 0;
}

public bool IsEmputyOrDeleted()
{
return State == BucketState.Empty || State == BucketState.Deleted;
}
}
private enum BucketState
{
Empty = 0,
Full = 1,
Deleted = 2
}
// ...
}

Rehash()、Add()、Contains()、Remove() 也要做相应的修改:

public class HashSet4
{
// 将老数组中的项在大小为 newSize 的新数组中重新排列
private void Rehash(int newSize)
{
_occupancy = 0; // 将标记为碰撞的槽的数量重新设为0
Bucket[] newBuckets = new Bucket[newSize]; // 新数组
// 将老数组中的项添加到新数组中
for (int oldIndex = 0; oldIndex <_buckets.Length; oldIndex++)
{
if (_buckets[oldIndex].IsEmputyOrDeleted())
continue; // 跳过已经删除的槽好空槽

// 向新数组添加项
int i = 0; // 已经探查过的槽的数量
do
{
int j = DH(_buckets[oldIndex].k, i, newBuckets.Length); // 想要探查的地址
if (newBuckets[j].IsEmputyOrDeleted())
{
newBuckets[j].Key = _buckets[oldIndex].Key;
newBuckets[j].k = _buckets[oldIndex].k;
newBuckets[j].State = BucketState.Full;
break;
}
else
{
if (newBuckets[j].IsCollided == false)
                                                                                                                                            

接下来考虑如何保留添加时的顺序。可以为 Bucket 添加两个整型变量 Prev 和 Next 来模拟双向链表,添加时的顺序将保存在链表里。Prev 保存前一被添加的项的下标,Next 保存下一个被添加的项的下标,-1 表示没有前驱或后继。还要在 HashSet4 里增加整型变量 _first 和 _last 保持第一个和最后一个被添加的项,以及向链表添加和删除项的函数:

public class HashSet4 : IEnumerable
{
private int _first = -1; // 第一个节点下标
private int _last = -1; // 最后一个节点下标
// 将_buckets[index]追加到链表末尾
private void AppendToLink(int index)
{
if (_first == -1)
_first = index;
if (_last == -1)
{
_buckets[index].Prev = -1;
_buckets[index].Next = -1;
}
else
{
_buckets[_last].Next = index;
_buckets[index].Prev = _last;
_buckets[index].Next = -1;
}
_last = index;
}
// 将_buckets[index] 从链表中移除
private void RemoveFromLink(int index)
{
if (_first == index)
_first = _buckets[index].Next;
if (_last == index)
_last = _buckets[index].Prev;
if (_buckets[index].Prev != -1)
_buckets[_buckets[index].Prev].Next = _buckets[index].Next;
if (_buckets[index].Next != -1)
_buckets[_buckets[index].Next].Prev = _buckets[index].Prev;
}
// ...
}

再在 Add() 时调用 AppendToLink(j),在 Remove() 时调用 RemoveFromLink(j) 就可以完成链表的维护了。然后再让 HashSet4 实现 IEnumerable 就大功告成了:

public class HashSet4 : IEnumerable
{
public IEnumerator GetEnumerator()
{
return new Enumerator(this);
}

IEnumerator IEnumerable.GetEnumerator()
{
return new Enumerator(this);
}

public struct Enumerator : IEnumerator
{
private HashSet4 _set;
private TKey _current;
private int _index;

public Enumerator(HashSet4 set)
{
_set = set;
_current = default(TKey);
_index = set._first;
}
public TKey Current
{
get { return _current; }
}
public void Dispose()
{
}
object IEnumerator.Current
{
get { return _current; }
}
public bool MoveNext()
{
if (_index != -1)
{
_current = _set._buckets[_index].Key;
_index = _set._buckets[_index].Next;
return true;
}

_current = default(TKey);
return false;
}
public void Reset()
{
_index = _set._first;
_current = default(TKey);
}
}
// ...
}

附上 HashSet4 完整源码:

public class HashSet4 : IEnumerable
{
[DebuggerDisplay("Key = {Key} k = {k} IsCollided = {IsCollided} Prev = {Prev} Next = {Next}")]
private struct Bucket
{
public TKey Key;
private int _k; // Store hash code; sign bit means there was a collision.
public BucketState State;
public int Prev; // 前驱节点下标
public int Next; // 后继节点下标
public int k
{
get { return _k & 0x7FFFFFFF; } // 返回去掉最高的碰撞位之后的 _k
set
{
_k &= unchecked((int)0x80000000); // 将 _k 除了最高的碰撞位之外的其它位全部设为0
_k |= value; // 保持 _k 的最高的碰撞位不变,将 value 的值放到 _k 的后面几位中去
}
}
// 是否发生过碰撞
public bool IsCollided
{
get { return (_k & unchecked((int)0x80000000)) != 0; } // _k 的最高位如果为1表示发生过碰撞
}
// 将槽标记为发生过碰撞
public void MarkCollided()
{
_k |= unchecked((int)0x80000000); // 将 _k 的最高位设为1
}

public void MarkDeleted()
{
State = BucketState.Deleted;
k = 0;
}

public bool IsEmputyOrDeleted()
{
return State == BucketState.Empty || State == BucketState.Deleted;
}
}
private enum BucketState
{
Empty = 0,
Full = 1,
Deleted = 2
}
private Bucket[] _buckets;
private int GetHashCode(Object key)
{
return key.GetHashCode() & 0x7FFFFFFF;
}

private int _count = 0; // 添加到数组中的项的总数
private int _occupancy = 0; // 被标记为碰撞的槽的总数
private float _loadFactor = 0.72f; // 装载因子
private int _loadsize = 0; // 数组允许放置的项的数量上限,_loadsize = _bucket.Length * _loadFactor,在确定数组的大小时被初始化。
private int _first = -1; // 第一个节点下标
private int _last = -1; // 最后一个节点下标
// 将_buckets[index]追加到链表末尾
private void AppendToLink(int index)
{
if (_first == -1)
_first = index;
if (_last == -1)
{
_buckets[index].Prev = -1;
_buckets[index].Next = -1;
}
else
{
_buckets[_last].Next = index;
_buckets[index].Prev = _last;
_buckets[index].Next = -1;
}
_last = index;
}
// 将_buckets[index] 从链表中移除
private void RemoveFromLink(int index)
{
if (_first == index)
_first = _buckets[index].Next;
if (_last == index)
_last = _buckets[index].Prev;
if (_buckets[index].Prev != -1)
_buckets[_buckets[index].Prev].Next = _buckets[index].Next;
if (_buckets[index].Next != -1)
_buckets[_buckets[index].Next].Prev = _buckets[index].Prev;
}
public HashSet4(int capacity)
{
int size = GetPrime((int)(capacity / _loadFactor));
if (size <11)
size = 11; // 避免过小的size
_buckets = new Bucket[size];
_loadsize = (int)(size * _loadFactor);
}

// 扩张容器
private void Expand()
{
int newSize = GetPrime(_buckets.Length * 2); // buckets.Length*2 will not overflow
Rehash(newSize);
}

// 将老数组中的项在大小为 newSize 的新数组中重新排列
private void Rehash(int newSize)
{
_occupancy = 0; // 将标记为碰撞的槽的数量重新设为0
Bucket[] newBuckets = new Bucket[newSize]; // 新数组
// 将老数组中的项添加到新数组中
for (int oldIndex = 0; oldIndex <_buckets.Length; oldIndex++)
{
if (_buckets[oldIndex].IsEmputyOrDeleted())
continue; // 跳过已经删除的槽好空槽

// 向新数组添加项
int i = 0; // 已经探查过的槽的数量
do
{
int j = DH(_buckets[oldIndex].k, i, newBuckets.Length); // 想要探查的地址
if (newBuckets[j].IsEmputyOrDeleted())
{
newBuckets[j].Key = _buckets[oldIndex].Key;
newBuckets[j].k = _buckets[oldIndex].k;
newBuckets[j].State = BucketState.Full;
break;
}
else
{
if (newBuckets[j].IsCollided == false)
{
newBuckets[j].MarkCollided();
_occupancy++;
}
i += 1;
}
} while (true);
}
// 用新数组取代老数组
_buckets = newBuckets;
_loadsize = (int)(newSize * _loadFactor);
}

// 质数表
private readonly int[] primes = {
3, 7, 11, 17, 23, 29, 37, 47, 59, 71, 89, 107, 131, 163, 197, 239, 293, 353, 431, 521, 631, 761, 919,
1103, 1327, 1597, 1931, 2333, 2801, 3371, 4049, 4861, 5839, 7013, 8419, 10103, 12143, 14591,
17519, 21023, 25229, 30293, 36353, 43627, 52361, 62851, 75431, 90523, 108631, 130363, 156437,
187751, 225307, 270371, 324449, 389357, 467237, 560689, 672827, 807403, 968897, 1162687, 1395263,
1674319, 2009191, 2411033, 2893249, 3471899, 4166287, 4999559, 5999471, 7199369};

// 判断 candidate 是否是质数
private bool IsPrime(int candidate)
{
if ((candidate & 1) != 0) // 是奇数
{
int limit = (int)Math.Sqrt(candidate);
for (int divisor = 3; divisor <= limit; divisor += 2) // divisor = 3、5、7...candidate的平方根
{
if ((candidate % divisor) == 0)
return false;
}
return true;
}
return (candidate == 2); // 除了2,其它偶是全都不是质数
}
// 如果 min 是质数,返回 min;否则返回比 min 稍大的那个质数
private int GetPrime(int min)
{
// 从质数表中查找比 min 稍大的质数
for (int i = 0; i )
{
int prime = primes[i];
if (prime >= min) return prime;
}

// min 超过了质数表的范围时,探查 min 之后的每一个奇数,直到发现下一个质数
for (int i = (min | 1); i 2)
{
if (IsPrime(i))
return i;
}
return min;
}

private int H1(int k, int m)
{
return k % m;
}
private int H2(int k, int m)
{
return 1 + (k % (m - 1));
}
private int DH(int k, int i, int m)
{
return (H1(k, m) + i * H2(k, m)) % m;
}

public void Add(TKey item)
{
if (_count >= _loadsize)
Expand(); // 如果添加到数组中的项数已经到达了上限,要先扩张容器
else if (_occupancy > _loadsize && _count > 100)
Rehash(_buckets.Length); // 如果被标记为碰撞的槽的数量和 _loadsize 一般多,就要重新排列所有的项

int i = 0; // 已经探查过的槽的数量
int k = GetHashCode(item);
do
{
int j = DH(k, i, _buckets.Length); // 想要探查的地址
if (_buckets[j].IsEmputyOrDeleted())
{
_buckets[j].Key = item;
_buckets[j].k = k; // 仍然保留 _k 的最高位不变
_buckets[j].State = BucketState.Full;
AppendToLink(j);
_count++;
return;
}
else
{
if (_buckets[j].IsCollided == false)
{
_buckets[j].MarkCollided();
_occupancy++;
}
i += 1;
}
} while (i <= _buckets.Length);
throw new Exception("集合溢出");
}
public bool Contains(TKey item)
{
int i = 0; // 已经探查过的槽的数量
int j = 0; // 想要探查的地址
int hashCode = GetHashCode(item);
do
{
j = DH(hashCode, i, _buckets.Length);
if (_buckets[j].State == BucketState.Empty)
return false;

if (_buckets[j].k == hashCode && _buckets[j].Key.Equals(item))
return true;
else
i += 1;
} while (i <= _buckets.Length);
return false;
}
public void Remove(TKey item)
{
int i = 0; // 已经探查过的槽的数量
int j = 0; // 想要探查的地址
int hashCode = GetHashCode(item);
do
{
j = DH(hashCode, i, _buckets.Length);
if (_buckets[j].State == BucketState.Empty)
return;

if (_buckets[j].k == hashCode && _buckets[j].Key.Equals(item)) // 找到了想要删除的项
{
if (_buckets[j].IsCollided)
{ // 如果不是碰撞链的最后一个槽,要把槽标记为已删除
_buckets[j].MarkDeleted();
}
else
{ // 如果是碰撞链的最后一个槽,直接将 Key 设为 null
_buckets[j].State = BucketState.Empty;
_buckets[j].k = 0;
}
RemoveFromLink(j);
_count--;
return;
}
else
{
i += 1;
}
} while (i <= _buckets.Length);
}

public IEnumerator GetEnumerator()
{
return new Enumerator(this);
}

IEnumerator IEnumerable.GetEnumerator()
{
return new Enumerator(this);
}

public struct Enumerator : IEnumerator
{
private HashSet4 _set;
private TKey _current;
private int _index;

public Enumerator(HashSet4 set)
{
_set = set;
_current = default(TKey);
_index = set._first;
}
public TKey Current
{
get { return _current; }
}
public void Dispose()
{
}
object IEnumerator.Current
{
get { return _current; }
}
public bool MoveNext()
{
if (_index != -1)
{
_current = _set._buckets[_index].Key;
_index = _set._buckets[_index].Next;
return true;
}

_current = default(TKey);
return false;
}
public void Reset()
{
_index = _set._first;
_current = default(TKey);
}
}
}

 

如此看来,让使用了双重散列法的 HashSet4 变成泛型以及保留添加顺序并不十分困难(代价是为每一个槽增加了2个整型变量和一个枚举变量,是不是还有更好的方法呢?),不知道为什么 Dictionary 没有沿用 Hashtable 的算法。难道是看中了除法散列法+链接法常数因子比较小的优点?
  鉴于本篇已经够长的了,我们下一篇再详细解析 Dictionary 的源码。提前作个预告:虽然 Dictionary 是使用的除法散列法+链接法,但是并没有使用真正的链表,而是用一个数组模拟链表,即节省了空间,又能够保留添加时的顺序,可谓一举两得。


推荐阅读
author-avatar
倔强的石头二虎
这个家伙很懒,什么也没留下!
Tags | 热门标签
RankList | 热门文章
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有