近年来,分布式流处理系统作为处理持续数据流的重要工具正在快速发展,诸如Storm[1]、Apache Flink[2]等分布式流处理系统具有低延迟、高扩展性等特点[3],成为了许多大数据公司和研究机构的首选。
通常来说,一个流处理应用的拓扑结构被组织成一个有向无环图,其中每个顶点代表系统中的一个算子,每条边代表数据的流动。流处理系统所处理的数据通常被称为元组。为了快速处理大量数据,提高系统的吞吐量,数据流被分组为若干不相交的子流并由算子的若干实例进行并行处理。分组算法能够通过对数据流的分组,提高系统的吞吐量,降低元组的平均处理延迟。
本文提出了一种基于时间感知的分组算法。该算法根据不同实例处理时间和通信时间的差异性并综合考虑数据流的倾斜性进行分组,相较于已有分组算法提高了系统性能。本文首先对分布式流处理系统存在的网络异构性和处理能力异构性进行了分析;然后,基于系统的异构性,提出了时间感知分组算法(time-aware grouping,TAG),并分析了算法的不均衡性和算法所引入的内存和计算开销;最后,在Apache Flink分布式流处理平台上实现了时间感知算法,并在吞吐量和平均处理延迟等系统性能指标上与已有算法进行了对比分析。实验结果表明,时间感知算法相较于已有算法在系统吞吐量上提高了10%,在平均处理延迟上降低了33%。
1 相关研究当前分布式流处理系统分组策略的研究对象主要是有状态的算子[4-6]。在早期研究中,研究者主要考虑如何应对数据流分布的倾斜性,以均衡系统负载。Nasir等[7]提出了能够将倾斜数据流中的高频键值分到两个实例中处理的局部键值分组(partial key grouping,PKG)算法,但该算法不能应对倾斜程度很高的数据流。Rivetti等[8]提出了分布感知键值分组(distribution-aware key grouping,DKG)算法。DKG算法通过估计出高频键值的频率及贪心调度得到键值到目标实例的映射关系。DKG算法由于将具有相同键值的元组都分到同一实例中,不能有效应对倾斜较大的数据流。
近几年,研究者们越来越关注其他因素对分组策略的影响。Chen等[9]提出了网络感知分组(network-aware grouping,NAG)算法。NAG算法通过对不同的网络位置的实例给定不同的权值并根据网络优先级进行分组,同时为了适应网络变化,加入了动态调整权值的策略。Pacaci等[10]提出了一种分布感知分组(distribution-aware grouping,DAG)算法。DAG算法综合考虑内存开销和元组数量的不均衡性,采用贪心策略进行分组,在不均衡性和内存开销上达到了很好的平衡。
上述的分组算法都是以每个并行实例的元组数量相同作为优化目标,却忽视了系统异构性对分组策略造成的影响。本文提出的时间感知分组算法则综合考虑了系统的异构性,使用时间窗口动态调整分组策略以适应数据流分布的变化,能够在较小的开销下使系统达到负载均衡。
2 分布式流处理系统异构性分析分布式流处理系统的异构性主要包括网络异构性和处理能力异构性。
网络异构主要是由不同算子之间通信距离的不同造成的。表 1总结了不同通信距离之间的网络性能差异[9]。
位置 | 通信 | 方式 | 延迟/μs | 带宽/(Mb·s-1) |
进程本地 | 线程间 | 共享内存 | 0.1 | 20 000 |
节点本地 | 进程间 | 回环设备 | 20 | 2 500 |
机架本地 | 节点间 | 1-跳路由器 | 200 | 100 |
其他位置 | 机架间 | N-跳路由器 | 350 | 10 |
在实际的分布式环境中,上下游算子的实例数随着应用拓扑规模的扩大而增大,导致上下游算子实例之间会出现线程间、进程间、节点间和机架间多种通信距离,从而造成了网络异构性。
随着分布式流处理系统的发展,往往会出现同一算子不同实例处理能力异构的情况[11]。处理能力异构通常是由同一类型计算资源分配不同造成的,如多核处理器的核数差异或不同类型的计算资源如中央处理单元(CPU)、图形处理单元(GPU)和现场可编程门阵列(FPGA)处理能力的差别。处理能力的不同会造成不同实例处理数据的速率不同,即单位时间内处理的元组数量不同,从而对分组策略造成影响。
由以上分析可知,分布式流处理系统中存在着不可忽视的网络异构性和处理能力异构性,系统的异构性使得上游算子从发射元组到不同的下游算子实例处理结束所经过的网络传输时间以及元组处理时间不同。基于此本文提出了时间感知分组算法。
3 时间感知分组算法 3.1 系统模型流处理系统中每个算子通常都实现了一系列预定义的处理逻辑,比如连接、聚合、过滤或者其他的用户自定义函数。设流s的上游算子为u,下游算子为o。在运行时,算子o被分为若干个实例并行执行,设实例集合G={o1, o2, …, on},其中n表示并行度,如图 1所示。对于分组函数p,设元组集合E={e1, e2, …, em},首先将s划分成若干子流,设子流集合为S={s1, s2, …, sk},并建立映射关系F1:E→S,然后建立映射关系F1:S→G,将子流分配给指定实例,即对任意一元组ei,p(ei)=F2(F1(ei))=oj, oj∈I。
定义元组从上游算子发出直到下游算子处理完毕所经过的时间为完成时间。设下游算子的并行度为n,实例oi的平均完成时间为ti,单位时间内上游算子输出的元组数量为m,实例oi所分配的元组数量为mi,即
$ \forall 1 \le i < j \le n, \;{m_i}{t_i} = {m_j}{t_j}. $ | (1) |
与文[10]相似,定义不均衡性为:
$ I = \frac{{\max \left( {{l_i}} \right) - {\mathop{\rm avg}\nolimits} \left( {{l_i}} \right)}}{{{\mathop{\rm avg}\nolimits} \left( {{l_i}} \right)}}, 1 \le i \le n. $ | (2) |
系统的异构性对分组策略存在影响,下面通过图 2所示的例子进行说明。假定下游算子有3个并行实例o1、o2、o3,对应的平均完成时间分别为t1、t2、t3,且存在关系t1>t2>t3。p1代表基于负载均衡的分组算法,为每个实例分配相同的元组数量,但由于每个实例平均完成时间不同,造成了o1堵塞和o3空闲,导致系统性能下降。p2代表时间感知分组算法,根据各个实例的平均完成时间的不同,分配不同数量的元组,使系统性能得到改善。
3.2 算法设计
时间感知分组算法为了适应数据流的分布和系统的变化,使用时间窗口来动态调整分组策略。时间感知分组算法分为两个阶段:再平衡阶段和分组阶段。每个时间窗口结束时,算法会根据上个时间窗口的统计信息进行重新调整,最后获得新的路由策略,这一阶段为再平衡阶段;使用当前的路由策略对到来的元组进行分组,这一阶段为分组阶段。
1) 再平衡阶段。再平衡阶段如图 3所示,主要包括高频键值的统计、高频键值的划分、高频键值路由表的建立等步骤。
对于大流量的流数据,如果对其全部键值进行准确统计,会带来大量的内存开销。时间感知分组算法使用Space-Saving近似算法[12]进行统计,Space-Saving近似算法可以近似统计高频键值的频率。Space-Saving算法只有参数ε,且满足0 < ε≤1。该算法共有[1/ε]个〈元组, 计数器〉对,其中包括了所有出现次数大于或等于εm的元组,m表示统计的元组总数。时间感知分组算法通过此步骤得到了数据流高频键值分布情况。
根据式(1),键值的路由策略应使下游算子实例的总完成时间相同。设下游算子有n个实例,单位时间内到来的元组总数为m,实例oi的平均完成时间为ti,所分配的元组数量为mi,则总完成时间li=ti·mi。为了使各个算子实例总完成时间达到均衡,则有∀1≤i < j≤n, li=lj,即ti·mi=tj·mj。由
$ {m_i} = m/\left( {{t_i} \cdot \sum\limits_{j = 1}^n {\frac{1}{{{t_j}}}} } \right). $ | (3) |
由于数据的倾斜性,不同高频键值的频率也不相同,因此算法首先对高频键值进行逻辑上的划分,将所有高频键值划分为大小相近的数据段。经过划分步骤之后,算法得到了数量大小相近的若干数据段。
对于每个数据段采用随机的方法分配算子实例,则根据式(3)可知,某个数据段分配到实例oi上的概率为
$ {p_i} = \frac{{{m_i}}}{m} = \frac{{1/{t_i}}}{{\sum\limits_{j = 1}^n 1 /{t_j}}}. $ | (4) |
对每个数据段都使用式(4)的概率进行随机分组,从而得到数据段到实例的映射关系。每个高频键值对应多个数据段,则可获得高频键值到下游算子实例的“一对多”的路由表。
2) 分组阶段。分组阶段如图 4所示,主要是对输入的单个元组进行分组,输出该元组所分配到下游算子的某个并行实例。
算法维护一个当前各个实例负载状况表。对于实例oi,设其平均完成时间为ti,已分配的元组数量为mi,则其当前负载li=ti·mi。当一个元组到来时,首先检查高频键值路由表。如果为高频键值,则从高频键值路由表中获取到该键值所对应的多个下游算子实例,将这些实例作为该元组的候选实例。根据算法维护的实例负载状况表,从所有候选实例中选择当前负载最小的实例作为目标实例输出;如果为非高频键值,则使用两个独立的Hash函数将该键值映射到算子实例空间,从而得到两个候选实例,从中选择负载较小的实例作为目标实例输出。时间感知分组算法如图 5所示。
3.3 算法分析 3.3.1 不均衡性
高频键值元组被划分为为若干大小相近的数据段,并且根据式(4)计算出的概率进行分组。对于任意实例oi及其分配概率pi,当元组数量m足够大时,实例oi被分配的元组数量mi,根据Bernoulli大数定律,对任意整数ε, 有
$ {l_i} = {m_i}{t_i} = m\frac{1}{{\sum\limits_{j = 1}^n 1 /{t_j}}}. $ | (5) |
由式(5)可知,当元组数量足够大时,高频键值分配到实例上的负载几乎相同,由式(2)可知,高频键值所带来的不均衡性很小。
对于低频键值,可以使用球和盒子的模型来描述分组过程:元组代表球,其键值代表球的不同颜色,下游算子并行实例代表盒子。假定将m个球放到n个盒子里,并且有m≥n2,而且球的颜色的分布服从最大概率p1≤1/(5n)的分布D,则从d个随机候选盒子中选择当前最小数量的盒子进行放置,其不均衡性在概率至少为1-1/n的情况下满足[5]:
$ I = \left\{ {\begin{array}{*{20}{l}} {O\left( {\frac{{\ln n}}{{\ln \ln n}}} \right), }&{d = 1;}\\ {O(1), }&{d \ge 2.} \end{array}} \right. $ | (6) |
根据式(6),对应时间感知分组算法的d=2,因此低频键值在大概率情况下所带来的不均衡性为O(1)。
综合高频键值和低频键值的不均衡性分析,系统的整体不均衡性在大概率情况下为O(1)。
3.3.2 算法开销算法开销主要包括分组器的内存开销和计算开销,以及下游算子维护状态的内存开销。
1) 分组器的内存开销。算法维护了高频键值路由表以及各个实例权值表,内存开销为
2) 分组器的计算开销。由于路由表采用Hash表数据结构,查找时间为O(1),然后从至多n个候选实例中进行选择,时间复杂度为O(n),因此总计算开销为O(n)。
3) 下游算子维护状态的内存开销。设高频键值集合大小为h,低频键值集合大小为l,则键值集合总大小k=h+l。低频键值最多由两个算子实例维护其状态信息,占用内存开销为O(2l)。高频键值每个至多有n个候选实例,则占用内存开销为O(n·h),因此占用的总内存开销为O(2k+(n-2)h)。根据式(6),将高频键值的频率阈值设定为f>1/(5n),则h≤5n,此时内存总开销为O(2k+5n2-2n)。
4 实验验证与分析 4.1 实验环境实验在Apache Flink分布式流处理平台上进行。实验所用的硬件环境配置和软件环境配置分别如表 2和3所示。实验所使用的数据集主要包括维基数据集。维基数据集包括了2007年10月这一个月之内的维基的访问历史日志[13]。
软件 | 版本 | 实例数 |
OS | CentOS 7 | 6 |
Flink | 1.7.2 | 1 JobManager 5 TaskManager |
Kafka | 2.12 | 3 Brokers |
Zookeeper | 3.4.13 | 3 |
本文提出的时间感知分组算法(TAG)所使用的对比算法包括分布感知键值分组算法(DKG)[8]、局部键值分布算法(PKG)[7]、网络感知分组算法(NAG)[9]和分布感知分组算法(DAG)[10]。实验主要从系统吞吐量、系统的平均处理延迟进行评价和分析。
4.2 实验结果与分析本文使用维基数据集来展现TAG算法在现实世界数据集上的优越性。维基数据集具有分布倾斜较大的特点。不同算法在维基数据集上的吞吐量大小随并行度的变化如图 6所示。TAG算法在不同的并行度下,系统吞吐量都表现最好。在并行度为12时,TAG算法的吞吐量相较于DAG算法提高了10%。TAG算法在面对高倾斜程度的数据集时,能够将键值按照其频率的不同,分组到多个下游算子实例中处理,不会造成不同实例处理的元组数量严重不均衡的情况,同时TAG算法又考虑到系统的异构性,每个实例根据其完成时间不同分到不同数量的元组,使得系统达到整体的负载均衡,因此吞吐量最大。
图 7则展示了不同算法在维基数据集上不同并行度下的平均处理延迟。TAG算法在不同的下游算子并行度下,平均处理延迟均最低。在并行度为12时,TAG算法相较于NAG算法,平均处理延迟降低了33%。本文提出的TAG算法不仅能够有效地分组高频键值以应对高倾斜程度的数据流,而且能够在多个候选实例中综合考虑网络传输时间和处理时间以及当前每个实例所处理的元组数量,从中优先选择平均完成时间较短和处理的元组数量较小的实例,从而使TAG算法的平均处理延迟最短。
最后,本文对各算法在下游算子并行度为12的情况下,其对应的吞吐量和平均处理延迟随时间的变化趋势进行了比较和评估。
如图 8所示,NAG算法和TAG算法相对于其余算法,能够动态地根据运行时的信息调整分组策略。TAG算法使用动态时间窗口,并且在每个时间窗口内分组策略都会产生较小的变化,从而使得TAG吞吐量存在较小波动。在平均处理延迟方面,如图 9所示,本文提出的TAG算法由于其基于时间窗口动态调整策略和基于时间感知的分组策略,因此能够使系统保持在稳定的低延迟的处理状态。
5 结论
本文提出了基于分布式流处理系统的时间感知分组算法,该算法通过分析系统的异构性并动态调整分组策略,从而使得系统的各个并行实例负载均衡,提高了系统性能。
在算法的设计上,时间感知分组算法采用动态时间窗口策略并分为再平衡和分组两个阶段,以适应数据流和系统的变化。算法根据统计结果,将数据分为高频键值和低频键值两类。对于高频键值,算法将其划分为大小相近的逻辑上的数据段,并将数据段根据各个实例的完成时间使用随机方法进行分组,对于每个到来的元组从多个候选实例中选择负载较小的实例;对于低频键值,算法使用两个独立的Hash函数得到两个候选实例,并选择负载较小的实例。
通过在Apache Flink分布式流处理平台上进行实验,本文将时间感知分组算法与已有的分组算法从吞吐量和平均处理延迟两个方面进行了对比。实验结果表明,时间感知分组算法比已有的分组算法在系统吞吐量上提高了10%,在平均处理延迟上降低了33%。
[1] |
TOSHNIWAL A, TANEJA S, SHUKLA A, et al. Storm@Twitter[C]//Özsu M T. Proceedings of the 2014 ACM SIGMOD International Conference on Management of Data. New York, USA, 2014: 147-156.
|
[2] |
CARBONE P, KATSIFODIMOS A, EWEN S, et al. Apache FlinkTM:Stream and batch processing in a single engine[J]. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, 2015, 38(4): 28-38. |
[3] |
BABCOCK B, BABU S, DATAR M, et al. Models and issues in data stream systems[C]//Proceedings of the 21st ACM SIGMOD-SIGACT-SIGART Symposium on Principles of Database Systems. New York, USA, 2002: 1-16.
|
[4] |
KATSIPOULAKIS N R, LABRINIDIS A, CHRYSANTHIS P K. A holistic view of stream partitioning costs[J]. Proceedings of the VLDB Endowment, 2017, 10(11): 1286-1297. DOI:10.14778/3137628.3137639 |
[5] |
FANG J H, CHAO P F, ZHANG R, et al. Integrating workload balancing and fault tolerance in distributed stream processing system[J]. World Wide Web, 2019, 1-26. DOI:10.1007/s11280-018-0656-0 |
[6] |
GEDIK B. Partitioning functions for stateful data parallelism in stream processing[J]. The VLDB Journal, 2014, 23(4): 517-539. DOI:10.1007/s00778-013-0335-9 |
[7] |
NASIR M A U, MORALES G D F, GARCIA-SORIANO D, et al. The power of both choices: Practical load balancing for distributed stream processing engines[C]//2015 IEEE 31st International Conference on Data Engineering. Seoul, South Korea, 2015: 137-148.
|
[8] |
RIVETTI N, QUERZONI L, ANCEAUME E, et al. Efficient key grouping for near-optimal load balancing in stream processing systems[C]//Proceedings of the 9th ACM International Conference on Distributed Event-Based Systems. New York, USA, 2015: 80-91.
|
[9] |
CHEN F, WU S, JIN H. Network-aware grouping in distributed stream processing systems[C]//Proceedings of the 18th International Conference on Algorithms and Architectures for Parallel Processing. Guangzhou, China, 2018: 3-18.
|
[10] |
PACACI A, ÖZSU M T. Distribution-aware stream partitioning for distributed stream processing systems[C]//Proceedings of the 5th ACM SIGMOD Workshop on Algorithms and Systems for MapReduce and Beyond. New York, USA, 2018: 1-10.
|
[11] |
NASIR M A U, HORII H, SERAFINI M, et al. Load balancing for skewed streams on heterogeneous cluster[Z]. arXiv preprint. arXiv: 1705.09073, 2017.
|
[12] |
METWALLY A, AGRAWAL D, EL ABBADI A. Efficient computation of frequent and top-k elements in data streams[C]//Proceedings of the 10th International Conference on Database Theory. Edinburgh, UK, 2005: 398-412.
|
[13] |
URDANETA G, PIERRE G, VAN STEEN M. Wikipedia workload analysis for decentralized hosting[J]. Computer Networks, 2009, 53(11): 1830-1845. DOI:10.1016/j.comnet.2009.02.019 |