As-Stream:一种针对波动数据流的算子智能并行化策略
李维1,2, 李城龙3, 杨家海3    
1. 清华大学 信息化技术中心, 北京 100084;
2. 中国地质大学(北京), 北京 100083;
3. 清华大学 网络科学与网络空间研究院, 北京 100084
摘要:大量研究提出了从在线资源管理层面来优化波动数据流的方法, 却忽略了从流应用层面来优化算子并行度。例如, 在Apache Storm中, 算子并行度一旦设置就无法进行动态调整。该文提出了一种针对波动数据流的算子智能并行化策略As-Stream, 显著提升了流计算平台的性能。该方法在弹性智能监控模块中基于无监督学习和自适应分析对参数进行实时调优。As-Stream包括并行瓶颈识别、参数计划生成、参数迁移转换和参数迁移调度算法。该系统在Apache Storm平台上实现, 并在真实的分布式流计算环境中进行了大量测试。结果表明, As-Stream性能比现有通用调度策略有显著提升:当资源充足时, 平均吞吐量提高了2.4倍; 当资源受限时, 平均延迟减小了44%。
关键词流计算    机器学习    算子并行度    资源分配    
As-Stream: An intelligent operator parallelization strategy for fluctuating data streams
LI Wei1,2, LI Chenglong3, YANG Jiahai3    
1. Information Technology Center, Tsinghua University, Beijing 100084, China;
2. China University of Geosciences, Beijing 100083, China;
3. Institute for Network Science and Cyberspace, Tsinghua University, Beijing 100084, China
Abstract: A large number of studies have presented methods using online resource management to optimize stream computing for fluctuating data streams, but have not optimized the parallel operator operations at the streaming application level. For example, in Apache Storm, the operator parallelism cannot be dynamically adjusted once it is set. This paper presents an intelligent parallelization strategy for operators with fluctuating data streams, As-Stream, which significantly improves the streaming computing platform performance. This method uses real-time tuning of parameters based on unsupervised learning and self-adaptive analyses in an elastic intelligent monitoring module. As-Stream includes parallel bottleneck identification, parameter plan generation, parameter migration conversion and parameter migration scheduling algorithms. The system was implemented on an Apache Storm platform with a large number of tests in a real distributed stream computing environment. The results show that this system significantly improves the performance compared with existing default scheduling strategies. With sufficient resources, the average throughput is increased 2.4 fold while with limited resources, the average latency is reduced by 44%.
Key words: stream computing    machine learning    operator parallelism    resource allocation    

大数据计算可以分为流计算、批计算、图计算、交互计算等多种不同形态,而流计算和批计算是大数据场景下处理和分析阶段的2种最主要的计算形态。通常,批计算提供精确计算,而流计算提供实时近似计算。批计算能够溯源,是一种结果确定型计算模式;流计算处理结果会随着时间而改变,是一种结果不确定型计算模式。批计算依赖于磁盘存储,而流计算依赖于内存存储。批计算在作业级别可以构成更大的依赖关系;而流计算通过增量消费建立依赖任务。因此,与批计算相比,流计算能够提供实时、持续、增量的数据处理和分析服务。

随着在线应用服务的不断发展,分布式流计算平台(DSPS)使大数据应用程序能够处理连续的数据流并获得实时反馈。目前,主流的DSPS包括:Apache Storm[1]、Apache Heron[2]、Apache Flink[3]、Apache Spark Streaming[4]、Apache Samza[5]、Apache Apex[6]、Google Cloud Dataflow[7]和Timely Dataflow[8]等。这些DSPS大多是在数据流图中描述流应用,并且支持有状态算子[9]。大量流计算相关研究从在线资源管理层面为波动数据流提供优化,却忽略了流应用层面的算子并行度优化。例如,在Apache Storm中,算子并行度一旦设置就无法进行动态调整。本文在研究DSPS结构特征、工作机制以及参数配置的基础上,结合其中有代表性的DSPS进行适用性分析[10-11],提出并设计了一种针对波动数据流的算子智能并行化策略As-Stream,通过机器学习和自适应方法实现了在运行时的持续调优[12-13],以独立的监控模块将不同的DSPS平台看成不同的整体[14],如图 1所示。随着环境的变化,监控模块能够实时收集信息,完成资源分配的智能优化,从而保证平台的流畅运行[15]

图 1 As-Stream监控多种流计算平台

本文通过实验验证了算子并行度对流计算平台的性能有显著影响,并基于无监督学习和自适应分析对参数进行实时调优。从流应用层面和资源管理层面构建了弹性智能监控模块。在流应用层面上,设计了并行瓶颈识别、参数计划生成和参数迁移转换算法;在资源管理层面上,设计了参数迁移调度算法,以避免由可插拔调度可能带来的新瓶颈。在Apache Storm平台上实现了As-Stream,并在分布式流计算真实环境中进行了大量的实验分析。实验结果表明As-Stream与现有通用调度策略相比性能上有显著提升。

1 相关研究

DSPS通常伴随着大量的参数包括并行度、I/O行为、内存设置等,这些参数会对作业执行和资源分配产生影响[16]。这些参数控制平台执行的多个方面,从低级别的内存和线程数配置到高级别的资源管理和负载平衡决策[17]。DSPS中的参数无法在运行时针对环境变化做出自适应调整,本文分析了Apache Storm平台上的常用参数,包括工作进程(worker)、喷口(spout)、螺栓(bolt)等关键性能配置参数,如表 1所示。

表 1 Apache Storm关键性能配置参数
参数名称 参数说明
supervisor.slots.ports 每台机器的工作进程数
supervisor.worker.timeout.secs 工作进程的超时时间,单位秒,超时后会重新分配
supervisor.heartbeat.frequency.secs 心跳发送频率
supervisor.monitor.frequency.secs 检测工作进程心跳的频率
topology.workers 整个拓扑的工作进程数
topology.tasks 每个喷口或螺栓的任务线程数
topology.max.task.parallelism 整个喷口或螺栓的最大任务线程数
topology.worker.receiver.thread.count 每个工作进程的元组接收线程数
topology.acker.executors 跟踪喷口发出元组的线程数
topology.max.spout.pending 在喷口任务上待处理的最大元组数
topology.executor.receive.buffer.size 每个执行线程的接收队列大小
topology.transfer.buffer.size 每个工作进程的传输队列大小
topology.producer.batch.size 在发送到目标执行线程之前要批处理的元组数
topology.transfer.batch.size 在发送到目标工作进程之前要批处理的元组数
worker.heartbeat.frequency.secs 工作进程的心跳发送时间间隔
task.heartbeat.frequency.secs 任务线程的心跳发送时间间隔

本文通过对关键性能配置参数的人工干预,统计了Apache Storm上统计词数(wordcount)实例的单位吞吐延迟,如表 2所示。按照表 2中序号初步对比分析了参数变化对平台性能的影响,如表 3所示。

表 2 单位吞吐延迟初步实验数据
序号 工作进程数量 喷口数量 螺栓数量 进程延迟/ms 执行延迟/ms
拆分任务 计数任务 拆分任务 计数任务 拆分任务 计数任务
1 1 5 8 12 1.14 685.66 2.70 1.35
2 1 15 8 12 3.89 904.07 3.22 1.72
3 1 5 24 12 2.78 276.60 2.74 1.03
4 1 5 8 36 3.06 1 321.13 12.91 6.01
5 3 5 8 12 0.60 199.60 1.20 0.48
6 3 15 8 12 0.45 146.43 1.76 0.98
7 3 5 24 12 0.96 902.46 4.40 2.34
8 3 5 8 36 1.94 1 719.72 10.67 5.46

表 3 参数变化对平台性能影响
对比序号 结果
1, 5 工作进程数量扩大倍数与单位吞吐延迟减少倍数呈正相关
1, 25, 6 喷口数量扩大倍数与延迟变化几乎没有影响
1, 35, 7 拆分任务螺栓数量扩大倍数与拆分单位吞吐延迟扩大倍数呈正相关
1, 45, 8 计数任务螺栓数量扩大倍数与计数单位吞吐延迟扩大倍数呈正相关

基于实验结果分析,通过调整算子并行度配置能够对流应用性能产生影响。因此,如果能够在非人工干预条件下,不断产生更好的算子迁移路径和资源分配,将能够持续提高DSPS的处理性能。然而,耗时的训练过程一定程度上限制了参数配置优化的实时性,状态和数据的不一致也会造成相当大的开销。引入机器学习方法解决这个问题是一个具有挑战性的研究方向。

文[18]提出了一种双指数平滑方法来预测异常事件,解决了Markov模型需要训练过程的缺点。文[19]提出了一种动态工作负载分配机制,用于分布式流计算引擎中操作之间的负载均衡。文[20]提出了一种具有成本效益的资源分配模型,根据不同的资源分配方案和工作负载来分析实际集群中的应用程序。文[21]在拟合的基础上引入了学习率和折扣因子的思想,从而达到更好地训练结果函数的目的。文[22]提出了一种最大积压优先(LBF)算法以及监控模块Hone,通过最小化并行任务的最大队列积压来减少延迟,并以非侵入式的方式实现。文[23]提出了一种基于流应用程序的管道数据处理模型,通过学习分析和加入监测期间概率分布均值,有效降低数据集和函数值的误差,提高训练的效率和准确率。上述方法与As-Stream的比较如表 4所示。

表 4 As-Stream与相关工作比较
方法 全面性 并行度 机器学习 节约成本 节约资源
文[18] × × × ×
文[19] × × ×
文[20] × ×
文[21] ×
文[22] ×
文[23] ×
As-Stream

2 As-Stream

As-Stream通过监控程序获取Apache Storm应用程序接口中的算子相关信息[24],将实时清洗数据集通过并行度瓶颈识别算法生成瓶颈级别。将瓶颈级别和算子相关信息通过参数计划生成算法生成参数计划。将参数计划通过参数迁移转换算法生成迁移路径并存储在路由表中。路由表能够稳定地控制内存开销和计算开销,性能上能够满足实时性需求[25]。Apache Storm中默认调度不考虑进程间或节点间优化,会出现计算资源因通信瓶颈导致实例参数配置不佳[26]。除此之外,当可插拔调度产生开销大于实例优化时,很可能会成为影响性能的新瓶颈[27]。因此,有必要在保证通用性的基础上设计一个能够直接解析算子迁移路由表的调度策略[28]。当满足预设条件时,由监控模块执行指令,按照算子迁移路由表完成资源再分配[29]

为了简化模型,本文暂不考虑拟合后的非线性关系[30]。在监测期,根据概率分布取均值有效地降低数据集和函数值的误差,提高训练的效率和准确率[31]。基于预测思想,当上游邻接操作的输入数据与输出数据的比率已知时,能够提前获得下游邻接操作的输入数据[23]。同时,当拟合函数趋于稳定时,可以进入非监测期,从而降低训练开销,释放资源[32]。As-Stream通过监控模块的设计保证了跨平台通用性,整个流程和架构设计分别如图 23所示。

图 2 As-Stream流程图

图 3 As-Stream架构图

2.1 算子获取清洗

As-Stream监控模块通过调用Apache Storm接口获取算子相关信息,包括执行延迟、进程延迟、吞吐量等。

通过定义单位平均延迟Ataski和单位总延迟ζ,来反映单位时间内拓扑中任务的延迟情况:

$ {\rm{Atask}}_i=\frac{{\rm{Ltask}}_i}{{\rm{Ntask}}_i}, $ (1)
$ \zeta=\sum\limits_{i=1}^M {\rm{Atask}}_i=\sum\limits_{i=1}^M \frac{\mathrm{Ltask}_i}{{\rm{Ntask}}_i}. $ (2)

其中:数据集长度为M,Ltaski表示单位时间内第i条数据的任务延迟,Ntaski表示单位时间内第i条数据的任务执行次数。

2.2 并行瓶颈识别

该策略依次遍历拓扑中每个执行任务中的执行延迟,并量化得到瓶颈优先级,按照每个拓扑中的瓶颈优先级从高到低排序,从而得到瓶颈级别序列。

$ T_{\text {calc }}\left(n_i, c_j\right)=\frac{m\left(x\left(c_j\right)\right)}{p\left(n_i\right)} . $

其中:Tcalc(ni, cj)表示节点ni上任务cj的计算时间,m函数表示当前任务复杂度下所需的处理能力,x函数表示计算任务的复杂度,p函数表示分配在节点上算子的处理能力。

本文考虑不同网络环境中节点间的通信瓶颈因素,依次遍历每个节点的执行任务。

$ T_{\mathrm{comm}}\left(n_i, c_j\right)=\frac{1}{2}\left(\frac{m\left(x\left(c_j\right)\right)}{l_{i-1, i}}+\frac{m\left(x\left(c_j\right)\right)}{l_{i, i+1}}\right) . $ (4)

其中:Tcomm(ni, cj)表示nicj的通信时间,li-1, ili, i+1分别表示ni-1ninini+1的传输链路带宽。

分别计算所有节点上执行任务的计算时间和通信时间总和,较大者为瓶颈时间T

$ T=\max \left\{\operatorname{sum}\left(T_{\text {calc }}\left(n_i, c_j\right)\right), \operatorname{sum}\left(T_{\text {comm }}\left(n_i, c_j\right)\right)\right\} . $ (5)

本文设计了线性回归的阈值识别函数,通过最小二乘法来求解该目标函数的最优解,从而判定是否需要重新分配,具体步骤如下:

1) 根据上述并行瓶颈识别方法,建立一阶二项式线性回归公式并展开,能够得到精度区间内函数值与实际值之间的均值误差:

$ e\left(f, \gamma_i\right)=\frac{1}{M} \sum\limits_{i=1}^M\left(f\left(t_i\right)-\gamma_i\right)^2. $ (6)

其中:数据集(ti, γi)由初始数据进行清洗得到。ti表示第i条数据的时间戳,γi表示数据集在时间戳ti的平均延迟。f函数表示阈值识别函数。

2) 通过一阶二项式线性公式的线性回归函数得到系数ωb的偏导数:

$ \left\{\begin{array}{l} \frac{\partial e}{\partial \omega}=2\left(\omega \sum\limits_{i=1}^M t^2-\sum\limits_{i=1}^M t_i\left(\gamma_i-b\right)\right) , \\ \frac{\partial e}{\partial b}=2\left(M b-\sum\limits_{i=1}^M t_i\left(\gamma_i-\omega t_i\right)\right). \end{array}\right. $ (7)

3) 令式(7)中2个等式为0,可得到ωb最优解:

$ \left\{\begin{array}{l} \omega=\frac{\sum\limits_{i=1}^M \gamma_i\left(t_i-\bar{t}\right)}{\sum\limits_{i=0}^M t_i^2-\frac{1}{M}\left(\sum\limits_{i=1}^M t_i\right)^2}, \\ b=\frac{1}{M} \sum\limits_{i=1}^M\left(\gamma_i-\omega t_i\right). \end{array}\right. $ (8)

4) 当训练集与拟合的线性标注函数的Euclid距离之和最小时,标注函数为阈值识别函数。当拟合函数稳定时进入非监测期,阈值识别函数与拓扑相关联并且可以被存储。因此,每个阈值识别函数的拟合效果不依赖于训练集的选择,而依赖于平台无监督学习时长。值得注意的是,阈值识别结果在当前拓扑实例中生成,并在拓扑生命周期结束时销毁。

5) 得到阈值识别函数后,实时比较f(ti)和γi并记录e(f, γi)。当e(f, γi)为正时,将f(ti)记录为有效瓶颈值。

记录瓶颈时间间隔内的最大有效瓶颈值作为参考瓶颈时间Tbottleneck(瓶颈时间间隔取决于数据集间隔和配置文件中初始重分配时间设置)。

6) 当Tbottleneck在同一区间内出现多次,并且与T在同一个数量级时,将Tbottleneck存放到瓶颈级别表lbottleneck中,并将lbottleneck从高到低进行排序。

2.3 参数计划生成

在生成参数计划时提供了2种优先策略:

1) 瓶颈级别优先策略:瓶颈级别最高的任务优先分配算子,剩余算子依次按照瓶颈级别顺序分配。这种分配策略更多地考虑了瓶颈级别的权重,适用于拓扑之间瓶颈时间差异较大的情况。

2) 参数计划优先策略:根据瓶颈级别从高到低,将执行线程数量加上瓶颈级别后乘以分配系数φ(默认为1)。同时,任务线程数相应增加上述结果与执行线程数同样的倍数。这种分配策略通过分配系数φ来控制瓶颈级别的权重,适用于拓扑之间的瓶颈时间差异不大的情况:

$ \bar{N}_{\text {task }}=\varphi \frac{N_{\text {executer }}+N_{\text {bottleneck }}}{N_{\text {executer }}} N_{\text {task }} \text {. } $ (9)

其中:Nexecuter表示执行线程数量,Nbottleneck表示瓶颈级别,Ntask表示原任务线程数量,Ntask表示新任务线程数量。参数计划示例如表 5所示。

表 5 参数计划示例
Nexecuter Ntask Nbottleneck Ntask
8 16 3 22
10 10 2 12
5 10 1 12

2.4 参数迁移转换

通过将参数计划转换为实际迁移存储在路由表中,从而生成参数迁移路径。算子分布表示任务线程在执行线程中的分布情况,例如“[1][2]…”表示任务线程[1]和[2]在执行线程中最前面的分布位置。参数迁移路径示例如表 6所示。

表 6 参数迁移路径示例
节点 端口 执行线程编号 任务线程编号 算子分布 迁移路径
源节点 目标节点 源端口 目标端口
N1 6 700 1 [1][2] [1][2]… N1N1 N3N3 6 7006 700 6 7026 702
N2 6 701 2 [3] …[3]… N2 N2 6 701 6 700
N3 6 702 3 [9][10] …[9][10]… N3N3 N1N1 6 7026 702 6 7016 701

2.5 参数迁移调度

考虑在优化过程中复杂的可插拔调度可能会产生瓶颈,本文设计了能够直接识别参数迁移路径的参数迁移调度。对于第一个操作,通常引入一个虚拟操作作为其上游邻接操作。上游邻接操作oj在固定时间间隔Δw在窗口w的输出数据,记为Djout(w):

$ D_j^{\text {out }}(w)=\operatorname{sel}_j \cdot D_j^{\text {in }}(w)+q_j^{\text {out }}(w). $ (10)

其中:Djin(w)是oj在窗口w的输入工作量,等于oj的输入队列的长度;qjout(w)是oj的输出队列的长度,seljoj的选择率,描述了oj的输出工作量与输入工作量的比率。对于每个操作,其输出和输入之间都存在函数关系,可以通过学习分析得到。本文暂时只考虑线性关系,取监测期内概率分布的平均值为selj。通过估计窗口(w+1)的输入工作量oi,这与oi与其上游邻接oj之间的带宽有关,并受其约束:

$ D_i^{\text {in }}(w+1)=\sum\limits_{o_j \in U_i} \min \left(D_i^{\text {out }}(w), B_{j i}(w)\right) . $ (11)

其中:Uioi的上游邻接操作集;Bjioioj之间的带宽,取Bj(w)和Bi(w)中的较小值。

2.6 算法代码

As-Stream中并行瓶颈识别、参数计划生成、参数迁移转换、参数迁移调度算法代码,分别如图 47所示。

图 4 并行瓶颈识别算法代码

图 5 参数计划生成算法代码

图 6 参数迁移转换算法代码

图 7 参数迁移调度算法代码

3 实验分析验证 3.1 实验环境

本文在阿里云跨地域集群下进行了数据实验。集群由30台机器构成,分为6组,分布在中国3个不同地区的6个城市。其中,1台机器指定为Apache Storm主节点,2台指定为Apache Zookeeper节点,其余27台机器作为Apache Storm子节点。集群通过不同地理位置、网段来模拟可能存在的通信瓶颈;通过不同配置来模拟可能存在的计算瓶颈。运行环境的软件配置信息为:Ubuntu操作系统版本20.04,Apache Storm版本2.1.0,JDK版本1.8,Apache Zookeeper版本3.4.14,Apache Kafka版本2.3.0,Redis版本6.0.5。另外,CPU、内存、带宽配置如表 7所示。

表 7 分布情况和硬件配置
地区 分组 城市 IPv4 配置
中国北部 1 北京 8.141.53.54 2核CPU,4 GB内存,2 Mb/s带宽
8.141.51.49
8.141.54.57
8.141.61.229
8.141.62.58
8.141.62.157
8.141.65.225
39.105.92.56
8.141.67.243
8.141.64.143 4核CPU,8 GB内存,2 Mb/s带宽
39.106.101.48
8.141.62.37
2 张家口 8.142.98.25 2核CPU,4 GB内存,1 Mb/s带宽
8.142.97.127
8.142.96.158
3 呼和浩特 39.99.53.176
39.99.57.0
39.104.165.184
中国东部 4 上海 106.15.76.152 2核CPU,4 GB内存,1 Mb/s带宽
139.196.112.68
106.15.79.79
5 杭州 47.98.20.201
118.31.169.141
118.31.170.137
中国南部 6 深圳 39.108.92.198 2核CPU,4 GB内存,1 Mb/s带宽
39.108.61.226
39.108.91.147
120.78.206.230
47.119.193.12
120.78.199.11

为了清楚地表示不同节点之间的物理距离,本文绘制了集群分组距离分布图,如图 8所示。

图 8 分组物理距离分布图

3.2 实验过程

将Storm中包含拆分和计数2个子任务的统计词数实例通过有向无环图(DAG)形式提交给计算集群。喷口是数据入口点,将数据转换为一连串的元组(tuples);螺栓是元组的处理组件,将输入元组转化为输出元组或结果,如图 9所示。

图 9 Storm中统计词数实例的DAG

Storm中统计词数实例通过模拟随机词输入到喷口中。当并行度设置为1时,无需调整参数,本文重点测试并行度设置大于1时的平台性能。本文模拟了不存在处理失败或超时元组时,容纳量(capacity)在20%以内资源充足的情况。同时,通过减少一半节点数,模拟了存在处理失败或超时元组时,容纳量在60%~80%资源受限的情况。

3.3 实验结果

以下将Apache Storm默认调度策略简称为Storm。本文对比分析了Storm和As-Stream在[38, 50]和[6, 186] min内的实验数据,其中,实验数据涉及到4个评价指标分别为:吞吐量指标、延迟指标、容纳量指标和能耗指标。

1) 吞吐量指标。

吞吐量反映了平台的承压能力。平台在较高的输入速率下会产生较高的累计吞吐量,在较低的输入速率下会产生较低的累计吞吐量。累计吞吐量能够反映平台在一定时间内持续的承压能力。

Storm和As-Stream在[21, 186] min范围内,统计词数实例上累计吞吐量在资源充足和资源受限情况下的数据集对比,如图 10所示。

图 10 Storm和As-Stream累积吞吐量对比

将统计词数实例在[21, 186] min范围内累计吞吐量除以时间,得到平均吞吐量。在资源充足情况下,Storm和As-Stream的平均吞吐量分别为94.355和226.613 tuples/s,As-Stream平均吞吐量比Storm提高了2.4倍;在资源受限情况下,Storm和As-Stream的平均吞吐量分别为42.217和77.679 tuples/s,As-Stream平均吞吐量比Storm提高了1.84倍。

2) 延迟指标。

执行延迟是通过从函数执行到每个DAG结束的时间戳评估;进程延迟是通过从元组到达传递到已确认的每个DAG的时间戳评估。执行延迟和进程延迟分别反映了所有正在运行的DAG的总体执行和处理时间,总延迟约等于处理延迟和进程延迟的总和。总延迟越小,数据处理能力越强。

在资源充足和受限情况下,Storm和As-Stream在[38, 50]和[6, 186] min,单位执行、进程延迟和单位任务延迟如图 1114所示。

图 11 资源充足下Storm和As-Stream单位执行、进程延迟

图 12 资源充足下Storm和As-Stream单位任务延迟

图 13 资源受限下Storm和As-Stream单位执行、进程延迟

图 14 资源受限下Storm和As-Stream单位任务延迟

图 1114中的数据集求均值后乘以平均吞吐量的倍数,得到平均延迟数据集如表 8所示。

表 8 Storm和As-Stream的平均延迟数据集
指标 时间范围/min 资源情况 平均延迟/ms
Storm As-Stream
执行延迟 [38, 50] 资源充足 25.927 18.732
资源受限 57.342 30.617
[6, 186] 资源充足 24.521 19.557
资源受限 53.415 31.955
进程延迟 [38, 50] 资源充足 149.692 100.552
资源受限 338.829 179.659
[6, 186] 资源充足 150.315 106.190
资源受限 337.329 187.174
总延迟 [38, 50] 资源充足 175.620 119.284
资源受限 396.172 210.278
[6, 186] 资源充足 174.836 125.750
资源受限 390.745 219.131

在资源充足和受限情况下,Storm和As-Stream在[38, 50]和[6, 186] min范围内,单位总延迟数据集对比,如图 15所示。

图 15 Storm和As-Stream单位总延迟对比

3) 容纳量指标。

容纳量是螺栓执行元组所花费的时间百分比。容纳量越大,负载越大。在参数调整上,需要扩大喷口或螺栓并行度来降低负载。在资源受限情况下运行在[38, 50]和[6, 186] min范围内,统计词数实例上容纳量数据集对比,如图 16所示。

图 16 Storm和As-Stream容纳量对比

在[38, 50] min范围内,对Storm和As-Stream的容纳量数据集分别求均值后为6.641%和1.958%,在[6, 186] min范围内,对Storm和As-Stream的容纳量数据集分别求均值后为6.39%和2.08%。

4) 能耗指标。

能耗指标反映了平台CPU利用率情况,是衡量系统负荷的关键指标。CPU利用率越高,能耗越高。通过实时监控,Storm和As-Stream在资源受限情况下运行在[6, 186] min时,CPU利用率均值分别为21.6%和37.66%。

本文通过观察发现在平台运行的前5min,数据受环境干扰影响较大。因此,通过分析[6, 186] min范围内数据得出结论:与Storm相比,在资源充足情况下,As-Stream平均吞吐量提高了2.4倍,平均延迟减少了28%;在资源受限情况下,As-Stream平均吞吐量提高了1.84倍,平均延迟减少了44%。As-Stream平均吞吐量优化效果在资源充足情况下更明显,平均延迟优化效果在资源受限情况下更明显。

4 结论

本文基于机器学习和自适应分析,提出并设计了一种针对波动数据流的算子智能并行化策略As-Stream。它独立于流计算平台之外,通过监控模块实时地获取数据并进行无监督学习,从而在流应用和资源管理层面有效地促进资源合理分配。通过瓶颈识别、计划生成和迁移转换算法解决了运行时算子并行度不透明的问题。同时,还设计了兼顾通用性和实时性的参数迁移调度。实验结果表明与现有通用调度策略相比,As-Stream具有更低的延迟和更高的吞吐量。

下一步将逐步完成对更多主流大数据流计算平台的兼容,并通过扩充深度学习算法完成持续弹性优化提升。

参考文献
[1]
Apache Storm[EB/OL]. (2013-11-05)[2021-12-30]. https://storm.apache.org.
[2]
Apache Heron[EB/OL]. (2015-09-25)[2021-12-30]. https://heron.apache.org.
[3]
Apache Flink®[EB/OL]. (2014-06-07)[2021-12-30]. https://flink.apache.org.
[4]
Apache. Spark Streaming[EB/OL]. (2014-02-25)[2021-12-30]. https://spark.apache.org/streaming.
[5]
Apache Samza[EB/OL]. (2013-11-05)[2021-12-30]. https://samza.apache.org.
[6]
Apache ApexTM[EB/OL]. (2015-03-14)[2021-12-30]. https://apex.apache.org.
[7]
Google. Cloud Dataflow[EB/OL]. (2014-6-05)[2021-12-30]. https://cloud.google.com/dataflow.
[8]
Apache. Timely Dataflow[EB/OL]. (2014-12-07)[2021-12-30]. https://github.com/timelydataflow.
[9]
ZHAO Y J, LIU Z, WU Y D, et al. Timestamped state sharing for stream analytics[J]. IEEE Transactions on Parallel and Distributed Systems, 2021, 32(11): 2691-2704. DOI:10.1109/TPDS.2021.3073253
[10]
DENG S Z, WANG B T, HUANG S, et al. Self-adaptive framework for efficient stream data classification on storm[J]. IEEE Transactions on Systems, Man, and Cybernetics: Systems, 2020, 50(1): 123-136. DOI:10.1109/TSMC.2017.2757029
[11]
MUHAMMAD A, ALEEM M, ISLAM M A. TOP-Storm: A topology-based resource-aware scheduler for stream processing engine[J]. Cluster Computing, 2021, 24(1): 417-431. DOI:10.1007/s10586-020-03117-y
[12]
AL-SINAYYID A, ZHU M. Job scheduler for streaming applications in heterogeneous distributed processing systems[J]. The Journal of Supercomputing, 2020, 76(12): 9609-9628. DOI:10.1007/s11227-020-03223-z
[13]
ZHENG T Y, CHEN G, WANG X Y, et al. Real-time intelligent big data processing: Technology, platform, and applications[J]. Science China Information Sciences, 2019, 62(8): 82101. DOI:10.1007/s11432-018-9834-8
[14]
LI W, SUN D W, GAO S, et al. A machine learning-based elastic strategy for operator parallelism in a big data stream computing system[C]//Proceedings of the 12th EAI International Conference on Broadband Communications, Networks, and Systems. Melbourne, Australia: Springer, 2021: 3-19.
[15]
ZHOU Q H, GUO S, LU H D, et al. Falcon: Addressing stragglers in heterogeneous parameter server via multiple parallelism[J]. IEEE Transactions on Computers, 2021, 70(1): 139-155. DOI:10.1109/TC.2020.2974461
[16]
HERODOTOU H, CHEN Y X, LU J H. A survey on automatic parameter tuning for big data processing systems[J]. ACM Computing Surveys, 2021, 53(2): 43.
[17]
MUHAMMAD A, ALEEM M. A3Storm: Topology, traffic, and resourceaware storm scheduler for heterogeneous clusters[J]. The Journal of Supercomputing, 2021, 77(2): 1059-1093. DOI:10.1007/s11227-020-03289-9
[18]
CHENG D Z, ZHOU X B, WANG Y, et al. Adaptive scheduling parallel jobs with dynamic batching in spark streaming[J]. IEEE Transactions on Parallel and Distributed Systems, 2018, 29(12): 2672-2685. DOI:10.1109/TPDS.2018.2846234
[19]
FANG J H, ZHANG R, FU T Z J, et al. Distributed stream rebalance for stateful operator under workload variance[J]. IEEE Transactions on Parallel and Distributed Systems, 2018, 29(10): 2223-2240. DOI:10.1109/TPDS.2018.2827380
[20]
ISLAM M T, KARUNASEKERA S, BUYYA R. dSpark: Deadline-based resource allocation for big data applications in Apache Spark[C]//Proceedings of the 2017 IEEE 13th International Conference on e-Science (e-Science). Auckland, New Zealand: IEEE, 2017: 89-98.
[21]
WANG W A, ZHANG C, CHEN X J, et al. An on-the-fly scheduling strategy for distributed stream processing platform[C]//Proceedings of the 2018 IEEE International Conference on Parallel and Distributed. Melbourne, VIC, Australia: IEEE, 2018: 773-780.
[22]
LI W X, LIU D W, CHEN K, et al. Hone: Mitigating stragglers in distributed stream processing with tuple scheduling[J]. IEEE Transactions on Parallel and Distributed Systems, 2021, 32(8): 2021-2034. DOI:10.1109/TPDS.2021.3051059
[23]
WEI X H, LI L, LI X, et al. Pec: Proactive elastic collaborative resource scheduling in data stream processing[J]. IEEE Transactions on Parallel and Distributed Systems, 2019, 30(7): 1628-1642. DOI:10.1109/TPDS.2019.2891587
[24]
ARMBRUST M, DAS T, TORRES J, et al. Structured streaming: A declarative API for real-time applications in Apache Spark[C]//Proceedings of the 2018 International Conference on Management of Data. Houston, TX, USA: ACM, 2018: 601-613.
[25]
SUN D W, HE H Y, YAN H B, et al. Lr-Stream: Using latency and resource aware scheduling to improve latency and throughput for streaming applications[J]. Future Generation Computer Systems, 2021, 114: 243-258. DOI:10.1016/j.future.2020.08.003
[26]
AO W C, PSOUNIS K. Resource-constrained replication strategies for hierarchical and heterogeneous tasks[J]. IEEE Transactions on Parallel and Distributed Systems, 2020, 31(4): 793-804. DOI:10.1109/TPDS.2019.2945294
[27]
CAO H Y, WU C Q, BAO L, et al. Throughput optimization for storm-based processing of stream data on clouds[J]. Future Generation Computer Systems, 2020, 112: 567-579. DOI:10.1016/j.future.2020.06.009
[28]
LIU S C, WENG J P, WANG J H, et al. An adaptive online scheme for scheduling and resource enforcement in storm[J]. IEEE/ACM Transactions on Networking, 2019, 27(4): 1373-1386. DOI:10.1109/TNET.2019.2918341
[29]
ESKANDARI L, MAIR J, HUANG Z Y, et al. T3-Scheduler: A topology and traffic aware two-level scheduler for stream processing systems in a heterogeneous cluster[J]. Future Generation Computer Systems, 2018, 89: 617-632. DOI:10.1016/j.future.2018.07.011
[30]
CHEN H H, ZHANG F, JIN H. PStream: A popularity-aware differentiated distributed stream processing system[J]. IEEE Transactions on Computers, 2021, 70(10): 1582-1597. DOI:10.1109/TC.2020.3019689
[31]
KHATIBI E, MIRTAHERI S L. A dynamic data dissemination mechanism for Cassandra NoSQL data store[J]. The Journal of Supercomputing, 2019, 75(11): 7479-7496. DOI:10.1007/s11227-019-02959-7
[32]
LIU P C, XU H L, DA SILVA D, et al. FP4S: Fragment-based parallel state recovery for stateful stream applications[C]//Proceedings of the 2020 IEEE International Parallel and Distributed Processing Symposium (IPDPS). New Orleans, LA, USA: IEEE, 2020: 1102-1111.