count distinct是如何实现的

count实现:

count(1|*)实现比较容易,只要设置一个计算器,对每条记录依次加1,需要的内存空间为一个Int/Long占用的空间。

pike中count 代码:

https://github.com/PPTV/Pike/blob/master/pike/src/main/java/com/pplive/pike/function/builtin/Count.java

//CountState中使用long计数,聚合时直接相加
public CountState combine(CountState left, CountState right) {
    if (left == null) 
        return right;
    if (right == null)
        return left;
    return new CountState(left.count() + right.count());
}

count distinct实现:

distinct需要进行去重,最直观的想法就是用HashSet保存需要去重的参数,最终统计输出结果时求HashSet的size即可,若采用map分布式计数,则先reduce合并成一个HashSet再求size。

pike中使用HashSet求distinct count代码:
https://github.com/PPTV/Pike/blob/master/pike/src/main/java/com/pplive/pike/function/builtin/DistinctFilter.java

//添加去重参数,
public boolean addIfNew(Object obj) {
    if(obj == null)
        return false;
    boolean result = this._values.add(obj);
    return result;
}

//返回结果
public long size() {
    return this._values.size();
}

//分布式,先合并
public static DistinctFilter merge(ISizeAwareIterable<DistinctFilter> distinctFilters) {
    if (distinctFilters.size() == 1) {
        return distinctFilters.iterator().next();
    }
    HashSet<Object> values = new HashSet<Object>();
    for(DistinctFilter f : distinctFilters) {
        values.addAll(f._values);
    }
    return new DistinctFilter(values);
}

这种方式缺点很明显,就是当数据量比较大时,很容易就会出现OOM。

然而,很多应用场景其实并不需要精确的去重,比如求页面的UV, 我们一般关注的是类似首页这种重点页面的UV, 这种页面一般访问量大,我们更关心uv的数量级以及变化趋势。对于这种一般的统计需求,聪明的数学家们基于概率统计的知识发明了基数统计算法。

基数概念和估算算法以下链接系列文章中介绍的很详细,在此,从工程的角度研究下几种算法的实现。

http://www.voidcn.com/article/p-mnegwpnf-nc.html

Pike中实现了三个版本的distinc count, 分别对应linercount、loglogadaptivecount以及hyperloglogcount算法,从名字可以看出算法的空间复杂度,工程上Pike直接copy了stream-lib的实现,加上implements Serializable接口,实现了分布式distinct count。

LinerCount

LinerCount是最简单的基数估算算法,空间复杂度上有常数级别的降低,算法步骤可参考此文章:

http://blog.codinglabs.org/articles/algorithms-for-cardinality-estimation-part-ii.html

Pike中的实现:
https://github.com/PPTV/Pike/blob/master/pike/src/main/java/com/pplive/pike/function/builtin/LinearCountState.java

linercount算法实现部分代码也很容易看懂, Pike借助linercount算法实现了两个版本的UDAF.

  • LinearCount(maxCardinality, expression) 已知Max Cardinality,误差低于1%
  • LinearCountEx(bitMapBytes, expression) 指定存储Hash Value的byte[]长度,越长误差越小

LogLogAdaptiveCount

LogLogAdaptiveCount根据LogLogCount空桶率决定是使用LinerCount还是LogLogCount,LogLogCount空间复杂度O(log2(log2(Nmax))),极大降低了空间开销,LogLogCount算法参考:

http://blog.codinglabs.org/articles/algorithms-for-cardinality-estimation-part-iii.html

Pike中LogLogAdaptiveCount实现:

https://github.com/PPTV/Pike/blob/master/pike/src/main/java/com/pplive/pike/function/builtin/LoglogAdaptiveCountState.java

代码同样比较容易就能看懂,构造LogLogAdaptiveCount计数器,只需指定hash value中用于计算bucket index的bit位数k.

  • LoglogAdaptiveCount(k, expression) 指定LogLogAptiveCount桶的个数为1<< k, 哈希值二进制前k位为桶index

HyperLogLogCount

HyperLogLog也是基于LogLogCount算法,相对LogLogAdaptiveCount,hyperloglogcount使用两种方式对误差进行修正:

  • 使用调和平均数替代几何平均数,过滤掉桶中偏差较大的值
  • 使用一种分阶段修正算法

目前进行基数估计的工程实践中基本都使用的是HyperLogLog算法,关于HyperLogLog算法以下文章介绍的清晰明了:

http://rainybowe.com/blog/2017/07/13/神奇的HyperLogLog算法/index.html

Pike中HyperLogLog算法实现:

https://github.com/PPTV/Pike/blob/master/pike/src/main/java/com/pplive/pike/function/builtin/HyperLoglogCountState.java

此实现同样copy自stream-lib, 但代码相对稍微难理解,主要是使用RegisterSet作为存储桶信息数据结构,RegisterSet中包含Int数组,为了竟可能节省空间,一个Int的32bit又被划分成了6个bucket,每个bucket占用5bit, bucket中存储的是hash value中去除计算bucket index的bit后第一个1出现的位置。

相较stream-lib的实现, Kylin老版本的实现中每个bucket占用1byte,存储范围更广,可能空间利用率没有那么高效,但代码却清晰易读多了,从注释可看出kylin实现参考了Presto。

Kylin HyperLogLog源码:

https://github.com/apache/kylin/blob/master/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounterOld.java

目前新版本Kylin也对HyperLogLog内存空间进行了优化。

Pike中使用HyperLogLog的UDAF参数意义与LoglogAdaptive完全相同:

  • HyperLogLogCount(k, expression) 指定HyperLogLogCount桶的个数为1<< k, 哈希值二进制前k位为桶index

总结

三种算法依次渐进,计算越来越精确,HyperLogLog基本已经成为基数估计的标准算法,结合pike中的move/accumulate函数,使用少量的内存即可方便实现各种实时场景下的基数估计需求。

相关文章

相关标签/搜索