数仓系列-Kylin概览

Kylin的特点

Kylin的主要特点包括支持SQL接口、支持超大规模数据集、亚秒级响应、可伸缩性、高吞吐率、BI工具集成等。

  1. 标准SQL接口:Kylin是以标准的SQL作为对外服务的接口。
  2. 支持超大数据集:Kylin对于大数据的支撑能力可能是目前所有技术中最为领先的。早在2015年eBay的生产环境中就能支持百亿记录的秒级查询,之后在移动的应用场景中又有了千亿记录秒级查询的案例。
  3. 亚秒级响应:Kylin拥有优异的查询相应速度,这点得益于预计算,很多复杂的计算,比如连接、聚合,在离线的预计算过程中就已经完成,这大大降低了查询时刻所需的计算量,提高了响应速度。
  4. 可伸缩性和高吞吐率:单节点Kylin可实现每秒70个查询,还可以搭建Kylin的集群。
  5. BI工具集成: Kylin可以与现有的BI工具集成,具体包括如下内容。
    1. ODBC:与Tableau、Excel、PowerBI等工具集成
    2. JDBC:与Saiku、BIRT等Java工具集成
    3. RestAPI:与JavaScript、Web网页集成

Kylin的基本架构

image.png

REST Server
REST Server是一套面向应用程序开发的入口点,旨在实现针对Kylin平台的应用开发工作。 此类应用程序可以提供查询、获取结果、触发cube构建任务、获取元数据以及获取用户权限等等。另外可以通过Restful接口实现SQL查询。

查询引擎(Query Engine)
当cube准备就绪后,查询引擎就能够获取并解析用户查询。它随后会与系统中的其它组件进行交互,从而向用户返回对应的结果。

Routing
负责将解析的SQL生成的执行计划转换成cube缓存的查询,cube是通过预计算缓存在HBase中,这部分查询可以在秒级设置毫秒级完成

元数据管理工具(Metadata)
Kylin是一款元数据驱动型应用程序。元数据管理工具是一大关键性组件,用于对保存在Kylin当中的所有元数据进行管理,其中包括最为重要的cube元数据。其它全部组件的正常运作都需以元数据管理工具为基础。 Kylin的元数据存储在HBase中。

任务引擎(Cube Build Engine
这套引擎的设计目的在于处理所有离线任务,其中包括shell脚本、Java API以及Map Reduce任务等等。任务引擎对Kylin当中的全部任务加以管理与协调,从而确保每一项任务都能得到切实执行并解决其间出现的故障。

Kylin工作原理

Apache Kylin的工作原理本质上是MOLAP(Multidimension On-Line Analysis Processing)Cube,也就是多维立方体分析,详细概念参考: 数仓系列-基本概念整理

核心算法

对数据模型做Cube预计算,并利用计算的结果加速查询:

  • 指定数据模型,定义维度和度量;

  • 预计算Cube,计算所有Cuboid并保存为物化视图;
    预计算过程是Kylin从Hive中读取原始数据,按照我们选定的维度进行计算,并将结果集保存到HBase中,默认的计算引擎为MapReduce,可以选择Spark作为计算引擎。一次build的结果,我们称为一个Segment。构建过程中会涉及多个Cuboid的创建,具体创建过程由kylin.cube.algorithm参数决定,参数值可选 auto,layer 和 inmem, 默认值为 auto,即 Kylin 会通过采集数据动态地选择一个算法 (layer or inmem),如果用户很了解 Kylin 和自身的数据、集群,可以直接设置喜欢的算法。

  • 执行查询,读取Cuboid,运行,产生查询结果

layer(逐层构建)算法

image.png

一个N维的Cube,是由1个N维子立方体、N个(N-1)维子立方体、N*(N-1)/2个(N-2)维子立方体、……、N个1维子立方体和1个0维子立方体构成,总共有2^N 个子立方体组成,在逐层算法中,按维度数逐层减少来计算,每个层级的计算(除了第一层,它是从原始数据聚合而来),是基于它上一层级的结果来计算的。比如,[Group by A, B]的结果,可以基于[Group by A, B, C]的结果,通过去掉C后聚合得来的;这样可以减少重复计算;当 0维度Cuboid计算出来的时候,整个Cube的计算也就完成了。

注意:
每一轮的计算都是一个MapReduce任务,且串行执行;一个N维的Cube,至少需要N次MapReduce Job

算法优点:

  1. 此算法充分利用了MapReduce的能力,处理了中间复杂的排序和洗牌工作,故而算法代码清晰简单,易于维护;
  2. 受益于Hadoop的日趋成熟,此算法对集群要求低,运行稳定;在内部维护Kylin的过程中,很少遇到在这几步出错的情况;即便是在Hadoop集群比较繁忙的时候,任务也能完成。

算法缺点:

  1. 当Cube有比较多维度的时候,所需要的MapReduce任务也相应增加;由于Hadoop的任务调度需要耗费额外资源,特别是集群较庞大的时候,反复递交任务造成的额外开销会相当可观;
  2. 由于Mapper不做预聚合,此算法会对Hadoop MapReduce输出较多数据; 虽然已经使用了Combiner来减少从Mapper端到Reducer端的数据传输,所有数据依然需要通过Hadoop MapReduce来排序和组合才能被聚合,无形之中增加了集群的压力;
  3. 对HDFS的读写操作较多:由于每一层计算的输出会用做下一层计算的输入,这些Key-Value需要写到HDFS上;当所有计算都完成后,Kylin还需要额外的一轮任务将这些文件转成HBase的HFile格式,以导入到HBase中去;

总体而言,该算法的效率较低,尤其是当Cube维度数较大的时候。

inmem(快速构建)算法

也被称作“逐段”(By Segment) 或“逐块”(By Split) 算法,从1.5.x开始引入该算法,利用Mapper端计算先完成大部分聚合,再将聚合后的结果交给Reducer,从而降低对网络瓶颈的压力。该算法的主要思想是:

  • 对Mapper所分配的数据块,将它计算成一个完整的小Cube 段(包含所有Cuboid);
  • 每个Mapper将计算完的Cube段输出给Reducer做合并,生成大Cube,也就是最终结果;

如图所示解释了此流程。
image.png

与layer(逐层构建)算法相比,快速算法主要有两点不同:

  • Mapper会利用内存做预聚合,算出所有组合;Mapper输出的每个Key都是不同的,这样会减少输出到Hadoop MapReduce的数据量,Combiner也不再需要;
  • 一轮MapReduce便会完成所有层次的计算,减少Hadoop任务的调配

Kylin Cube的构建过程

如图,是一个构建Cube的Job过程:
image.png

几个重要流程分析

从Hive表生成Base Cuboid

在实际的cube构建过程中,会首先根据cube的Hive事实表和维表生成一张大宽表,然后计算大宽表列的基数,建立维度字典,估算cuboid的大小,建立cube对应的HBase表,再计算base cuboid。

计算base cuboid就是一个MapReduce作业,其输入是上面提到的Hive大宽表,输出是的key是各种维度组合,value是Hive大宽表中指标的值

核心源码: org.apache.kylin.engine.mr.steps.BaseCuboidMapperBase:

1
2
3
4
5
6
7
8
9
// map 阶段生成key-value的代码
protected void outputKV(String[] flatRow, Context context) throws IOException, InterruptedException {
byte[] rowKey = baseCuboidBuilder.buildKey(flatRow);
outputKey.set(rowKey, 0, rowKey.length);

ByteBuffer valueBuf = baseCuboidBuilder.buildValue(flatRow);
outputValue.set(valueBuf.array(), 0, valueBuf.position());
context.write(outputKey, outputValue);
}

从HBase Cuboid 逐层计算 Cuboid

核心源码: org.apache.kylin.engine.mr.steps.CuboidReducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
aggs.reset();

for (Text value : values) {
if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
}
codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
aggs.aggregate(input, needAggrMeasures);
}
aggs.collectStates(result);

ByteBuffer valueBuf = codec.encode(result);

outputValue.set(valueBuf.array(), 0, valueBuf.position());
context.write(key, outputValue);
}

Cuboid 转化为HBase的HFile

参考: Hive 数据 bulkload 导入 HBase

Kylin的部署以及基本使用

参考官方链接: Apache Kylin

补充:
在实践过程中,我们采用 Nginx + Kylin集群方案(参考《Kylin的集群部署模式部署》),但由于资源紧缺, Kylin集群和 Hadoop集群在统一的服务器上,经常会出现某个时刻i资源紧张导致 Kylin进程崩溃,甚至拖垮HBase的问题,因此建议部署单独的Kylin集群,同时构建好监控机制

Kylin的优化

Kylin核心在于Cube,如何设计一个好的Cube对 Kylin的性能来说便是至关重要的, 影响Cube膨胀率和构建时间的重要因素主要有以下几个方面:

  • Cube中的维度数量较多,且没有进行很好的Cuboid剪枝优化,导致Cuboid数量极多;

  • Cube中存在较高基数的维度,导致包含这类维度的每一个Cuboid占用的空间都很大,这些Cuboid累积造成整体Cube体积变大;

  • 存在比较占用空间的度量,例如Count Distinct,因此需要在Cuboid的每一行中都为其保存一个较大的寄存器,最坏的情况将会导致Cuboid中每一行都有数十KB,从而造成整个Cube的体积变大;

优化思路

维度优化

维度优化手段

  • 聚合组
  • 衍生维度
  • 强制维度
  • 层次维度
  • 联合维度
  • Extended Column

详细参考: Apache Kylin 优化指南

并发粒度优化

当Segment中某一个Cuboid的大小超出一定的阈值时,系统会将该Cuboid的数据分片到多个分区中,以实现Cuboid数据读取的并行化,从而优化Cube的查询速度。

构建引擎根据Segment估计的大小,以及参数“kylin.hbase.region.cut”的设置决定Segment在存储引擎中总共需要几个分区来存储,如果存储引擎是HBase,那么分区的数量就对应于HBase中的Region数量。kylin.hbase.region.cut的默认值是5.0,单位是GB,也就是说对于一个大小估计是50GB的Segment,构建引擎会给它分配10个分区。用户还可以通过设置kylin.hbase.region.count.min(默认为1)和kylin.hbase.region.count.max(默认为500)两个配置来决定每个Segment最少或最多被划分成多少个分区。

由于每个Cube的并发粒度控制不尽相同,因此建议构建cube的时候为每个Cube量身定制控制并发粒度的参数。

参考链接

推荐 编程小梦|康凯森的技术博客, 博主是Apache Kylin等多个开源项目的 commiter, 博客有很多关于 OLAP 离线大数据方向的优质文章