基于动态获取高频率键的MapReduce性能优化算法
李建江1, 滑水亮2, 吴杰3, 张凯1     
1. 北京科技大学 计算机科学与技术系, 北京 100083, 中国;
2. 国家电网张家口供电公司信通分公司, 张家口 075000, 中国;
3. 天普大学 计算机与信息科学系, 费城 19122, 美国
摘要:在云计算技术领域中,MapReduce能够帮助人们快速处理海量数据,因此在学术界以及工业界越来越受到重视。但是MapReduce在处理以文本为中心的应用时,中间结果中数据重复较多。针对该情况,已有的高频率缓冲(frequency buffering,FB)算法提出在环形内存缓冲之前添加哈希表,并将高频率键存储在哈希表中。该算法通过采样来实现,有额外开销并且统计出的高频率键并不一定准确。该文提出一种基于动态获取高频率键的MapReduce性能优化算法,通过在环形内存缓冲之前增加计数Bloom过滤器(counting Bloom filter,CBF)和哈希表,将高频率键动态地存储在哈希表中。该算法获得的高频率键更准确,同时大大减少了数据排序和磁盘I/O的开销。实际测试结果表明:该算法明显提高了作业的执行速度,比原始MapReduce提高17.04%,比FB算法提高9.31%。
关键词MapReduce    高频率键    性能优化    计数Bloom过滤器    
Performance optimization algorithm for MapReduce based on obtaining frequent keys
LI Jianjiang1, HUA Shuiliang2, WU Jie3, ZHANG Kai1     
1. Department of Computer Science and Technology, University of Science and Technology Beijing, Beijing 100083, China;
2. State Grid Zhangjiakou Power Supply Company & Telecommunication Branch, Zhangjiakou 075000, China;
3. Department of Computer and Information Sciences, Temple University, Philadelphia 19122, USA
Abstract: MapReduce is getting much attention in academia and industry for use in cloud computing to quickly deal with huge amounts of data. However, when MapReduce deals with text-centric applications, the algorithm generates is large amount of duplicate data in the intermediate results that increases the run time. A frequency buffering (FB) algorithm was used to add a Hash table before the ring memory to store frequent keys in a Hash table. However, since the algorithm is implemented by sampling, the algorithm may not accurately estimate the overhead and the frequent keys. Therefore, this study added a performance optimization algorithm to MapReduce to obtain the frequent keys by adding a counting Bloom filter (CBF) and a Hash table to dynamically filter the frequent keys before storing them in the ring memory. This algorithm more accurately identifies the frequent keys and greatly reduces the data sorting overhead and the disk I/O. Tests show that this performance optimization algorithm for MapReduce for obtaining the frequent keys significantly improves the execution speed by 17.04% compared to the original MapReduce and 9.31% higher than the frequency buffering algorithm.
Key words: MapReduce     frequent key     performance optimization     counting Bloom filter    

随着互联网时代的来临以及信息时代的迅速发展,人类逐渐进入信息开放和信息充裕[1]的时代。目前来看,信息依然呈现出不断增长的趋势。面对海量信息,如何找到一种低成本、安全、可靠的技术来计算不断增长的信息,从而获得有用的信息成为一个难题。MapReduce的提出为该问题带来了很好的解决方案,它是一种编程模型,用于处理和计算大规模的数据,使之实现并行运算,其Java的开源实现Hadoop[2]作为一种可靠、稳定、高效的方式被越来越多地接受并运用。因此,研究MapReduce及其性能优化意义重大。

1 相关工作

随着MapReduce得到越来越多的关注,相应针对MapReduce的研究也在增多。其中,针对以文本为中心的应用的研究(如文章单词统计、倒排索引、自然语言处理等)就具有典型性,但是总的来说其优化方案是从以下几个方面展开的。

1) 参数调优。

文[3]详细研究了不同参数对MapReduce的性能影响,并通过调整参数来优化MapReduce的性能。参数调优的范围比较广泛,比如调整io.sort.factor。

2) 存储结构的优化。

文[4]运用列存储同时采用压缩技术明显提高了MapReduce性能,这在很大程度上提高了Hadoop的效率。文[5]详细介绍了存储结构RCFile,并将其运用到Hive中。文[6]实现了一个在虚拟环境中对数据局部性的全面而实用的解决方案vLocality,结合了一种新的存储架构,有效地减少了对共享磁盘的争用。

3) 调度算法优化。

调度是MapReduce中很重要的一个环节,其主要内容是将任务进行公平分配,充分利用空闲机器,同时尽可能地将任务分配给输入分片所在的机器,也就是计算向数据迁移,最大可能地减少I/O消耗。例如,截止时间调度算法[7]为了保证作业在截止时间之前完成,会根据作业中任务的运行进度和剩余时间来合理公平地获得集群资源。如果不能顺利完成就需要调整截止时间。为了使MapReduce可以更好地在异构环境下运行,文[8]根据底层硬件架构将节点“虚拟划分”为一些同构的子集群,并使用遗传算法来进行任务调度的优化。

4) 其他方面的优化。

合理分配和设置映射函数与集约函数的数量能够有效提高MapReduce的效率,默认值往往不能很好地体现MapReduce任务的需求。因此,有时需要合理运用合并函数,对本地数据进行合并。在处理应用的过程中,中间产生的数据可能很多是重复的或者数量太大。比如在单词统计中,因为词频的分布接近Zipf [9]分布,每个映射任务会产生大量的<the,1>这样的记录,如果不进行合并,都交给集约函数处理,将大大消耗I/O资源。文[10]旨在研究使用不同的Hadoop压缩选项来提高单词统计作业的计算性能,研究测试结果表明在对Map输出进行压缩时只有snappy和deflate可以较好地提高计算性能;在MapReduce未压缩时,使用bzip2输入文件可以减少磁盘空间并保持计算性能。文[11]对MapReduce中的单词统计应用程序进行了详细性能分析、表征和评估,并提出了一种基于Amdahl定律回归方法的估计模型,用于估计给定处理器体系结构的性能和总处理时间与不同输入大小的关系。

综上所述,目前针对MapReduce的研究越来越多。但大部分研究是通过以上几个方面展开的,也就是参数调优、存储结构的优化和调度算法优化。然而,以文本为中心的应用其处理的输入集很大,会产生大量的中间结果,如何减少中间结果值得深入研究。

文[12]提出了高频率缓冲(frequency buffering,FB)算法,该算法增加哈希表,将高频率键存储在哈希表中。由于对哈希表进行读写耗费的时间小于对环形内存执行相同操作所耗费的时间,因此将需要频繁读写的键存储在哈希表可以在很大程度上减少应用的运行时间。但是文[12]是通过提前局部采样来获取键,因此存在额外开销并且获取到的高频率键并不一定准确等缺点。所以,本文提出一种针对以文本为中心的应用的MapReduce性能优化算法,该算法在环形内存缓冲之前添加哈希表和计数Bloom过滤器(counting Bloom filter,CBF)[13-14]来动态获取并存储高频率的键,对Hadoop中的MapReduce进行性能方面的优化,减少了中间数据量,从而减少排序数据量和I/O磁盘读写数据量,最终提高了效率。

2 MapReduce优化算法实现

在自然语言处理过程中,一部分键出现的频率远远超出其他键出现的频率,即符合Zipf定律。此时,映射结果中会出现大量重复的键,后续的各种操作会对它们进行重复的操作。与自然语言的语料库中某个单词出现的频率与它在频率表中的排名成反比相类似,映射结果中会有一小部分键频繁出现。

由于文[12]通过采样方式来获取高频率键,这导致额外开销很大,而且需要考虑的问题也颇多。如果在键的频率分布中只有一小部分键的频率很高,则可将采样率设置得大一些,也可以获得相对粗糙的高频率键;反之,如果在键的频率分布中没有特别突出的高频键出现,则需要将采样率调整得小一些。本文提出的优化方案不仅需要添加一个哈希表来存储频繁出现的键,而且还需添加一个CBF用来动态实时地获取频繁出现的键(如图 1所示),从而将文[12]中离线模式修改为在线模式,降低MapReduce中洗牌阶段的I/O和排序开销。

图 1 本文提出的优化算法

在本文提出的优化方案中用到了CBF,由于利用Bloom过滤器(Bloom filter,BF)不仅需要查询元素而且还需要增加和删除元素,即需要动态地获取高频率键。其中,Key1、Key2分别为不同的键。标准BF并不支持增加和删除元素,为此需要将标准BF的每一位扩展为一个小的计数器。当往CBF中增加元素时,就将对应的k个计数器的值增加1(见图 2,其中k为哈希函数个数)。同理,当从CBF中删除元素时,就将k个计数器的值减少1。通过额外占用少许的存储空间,CBF能实现增加和删除功能。

图 2 CBF中元素的增加

在CBF中,哈希函数可能会带来冲突问题。如果某个集合S包括Key1、Key2等,并映射到CBF中。用某个键去进行映射查询,如果经过哈希运算后对应的比特位也都为1,则认为该键属于集合。虽然冲突的现象会导致一定的误判概率,但误判率非常低,因此这种数据结构目前已被众多研究广泛运用。

2.1 基于动态获取高频率键的MapReduce性能优化算法

本文提出的基于动态获取高频率键的MapReduce性能优化算法,可用全局分支控制策略(total control branch policy,TCBP)和CBF与哈希表数据传输(data transmit between CBF and Hash table,DCBH)这2个算法来描述。总体流程为间断性地让数据流过CBF,使其统计各个键的频率。如果键频率变高则将其交换到哈希表中,同理哈希表中频率较低的键有可能被交换到CBF中。如果CBF满,则将数据溢出到环形内存缓冲区。

TCBP算法是一个总的控制算法,其主要控制数据流向。如果数据在规定的范围内,则数据会先经过CBF然后再经过哈希表,这样使得哈希表中存储的是高频率键并且可以动态调整;DCBH算法主要描述数据在CBF与哈希表之间的交换和传递。

1) TCBP算法。

TCBP可描述本文提出的优化算法的总的数据控制流向。首先,需要描述要处理的数据走向。由于需统计数据流中键的值(即键出现的次数),同时映射的结果只能顺序读取一遍,因此本文通过CBF来挖掘高频的那些键,然后通过CBF和哈希表来进行数据交换,使哈希表中存储最为频繁的那些键。

TCBP算法如图 3所示。

图 3 TCBP算法伪代码

第1行:对映射任务节点数进行初始化。

第2~5行:对CBF和哈希表进行初始化。

第6行:对所有映射任务节点输出的键值对(key-value)执行循环判断操作。

第7~9行:判断键值对是否存在于输入文本的第sM~sM+N-1行(s=0,1,…)。如果存在,则执行DCBH算法,让数据流过CBF和哈希表,从而将高频率键存储在哈希表中。

第10~14行:如果键值对在输入文本的sM+N~(s+1)M-1行(s=0,1,…),则进一步判断该键值对所对应的键是否在哈希表中。如果在哈希表中,则将其对应键的频率值加1;如果不在哈希表中,则将该键值对存储到环形内存缓冲区中。当环形内存缓冲区满的时候,就会对环形内存缓冲区中的数据执行合并和排序操作,然后溢出存储到本地磁盘中。

第20~21行:如果环形内存已存储比例大于设定溢出比,则将环形内存中的数据写入磁盘。

算法1给出了针对以文本为中心的应用的MapReduce性能优化算法的总的控制策略。为了充分利用资源,每当数据流被采样处理(进行映射处理的同时进行采样处理,按照优化策略将频繁出现的键填充到CBF和哈希表中)N行之后,再单独处理M行(此时,使用CBF和哈希表中的键进行映射处理,不需要采样)。然后,再被采样处理N行,之后再单独处理M行。按该方式如此执行,直到所有数据被映射处理完毕。

在该算法中,将相对高频的键存储在哈希表中,将哈希表中相对低频的键调整到CBF中。如果所有的数据都流经CBF和哈希表,则可精确获得高频率键并存入哈希表。但是,这样做的开销太大。实际上,经实验验证,如果增大流经CBF和哈希表的数据量,对MapReduce性能将有明显提高,但是开销也将随之增加。在本文的实验部分,将数据分批进行单词统计,并分别设置每批数据中流经CBF和哈希表的数据占比为5%、10%和15%,用以确定相对合理的数据占比。

TCBP算法的主要作用是控制数据的流向,因此对于所有文本行均会进行1次处理。算法复杂度为O(n),其中n为所有文本行中的键数量。

2) DCBH算法。

DCBH算法如图 4所示。

图 4 DCBH算法伪代码

第1~3行:如果输入的键已经存在于哈希表中,则将对应键的频率值加1,并统计出频率最小的键,记为keyH-min。

第4~9行:如果输入的键尚未在哈希表中,则计算h1(key)、h2(key)、…、hk(key),并且对于CBF中对应的k个位的值加1,同时求CBF中频率最高的键的频率,记为keyCB-max

第10~21行:若哈希表此时已满,则比较keyH-min和keyCB-max的频率也就是minHash和maxBF的大小。如果maxBF>minHash,即其中CBF中已有键的频率已经超过哈希表中频率最低的键的频率,则将哈希表中keyH-min替换成CBF中的keyCB-max。同时,将CBF中的h1(keyCB-max)、h2(keyCB-max)、…、hk(keyCB-max)对应的值减去maxBF,并将h1(keyH-min)、h2(keyH-min)、…、hk(keyH-min)对应的值增加minHash。如果此时CBF也是满的,则将CBF中频率最小的keyCB-min及其对应的值溢出并写入环形内存。

第22~29行:若哈希表未满并且maxBF>minHash,则将keyCB-max及其对应的值存储在哈希表中。同时,将CBF中的h1(keyCB-max)、h2(keyCB-max)、…、hk(keyCB-max)对应的值减去maxBF。

综上所述,用CBF和哈希表来互相交换数据,使得出现频率较高的键由哈希表记录,频率较低的键由CBF记录。当CBF记录中的某个键其频率变得比较高的时候,就有可能取代哈希表中的某个键,哈希表中被取代的键则由CBF存储。

DCBH算法负责CBF与哈希表中高频键的交换,此时主要的计算部分为CBF中键的计数。算法中有部分数据会流经CBF和哈希表的采样过程,但数据占比一般不超过20%,因此计算复杂度为O(nk),n为所有文本行中的键数量,k为哈希函数的数量。

2.2 优化算法分析

文[12]中FB算法虽然针对以文本为中心应用的特点,对当前MapReduce进行了相关性能优化,但依然存在一些不足。为此,基于FB算法,本文提出动态获取高频率键,并在环形内存缓冲区前添加CBF和哈希表的优化策略,可进一步减少中间结果数据的数量,提高MapReduce的性能。

相比FB算法,本文提出的优化算法具有如下优势:

1) 该算法不需要额外采样。

该算法不需要通过额外采样来获取高频率键,更不需要计算采样率大小,只需间断性地通过CBF来动态调整哈希表中的高频率键。从而,节省了计算采样率以及通过额外采样求取高频率键而引入的额外开销。

2) 哈希表中的高频率键更接近真实情况。

通过CBF动态获取数据流中频率高的键,更能反映以文本为中心的应用中键的真实分布情况,不必考虑其高频率键的分布是集中还是平均。

3) 该算法是一个在线实时处理的过程。

该算法没有额外的采样过程,不需要计算采样率,能够动态实时地调整哈希表中的高频率键,因此是一个在线实时处理的过程。

3 实验及结果分析 3.1 实验平台选择及配置

实验的硬件平台为16台服务器,处理器为Intel(R) Xeon(R) CPU E5-2620 v3 @ 2.4 GHz、内存为64 GB。在每台服务器上安装Vmware虚拟机,每个虚拟机中搭建4个节点的Hadoop运行环境。在此基础上,分别运行Hadoop中原始MapReduce程序、文[12]优化后的MapReduce程序和本文优化后的MapReduce程序。使用不同大小规模的数据进行实际测试,并且同等规模数据在多次测试后求取平均运行时间来评估本文提出的优化算法。

其中,有63个从节点和1个主节点,DataNode和NodeManager位于从节点,Namenode节点和ResourceManager运行在主节点上,客户端通过主节点提交数据。表 1是测试环境的具体配置情况。

表 1 实验环境
具体环境 参数
操作系统 Ubuntu 16.40
每个节点内存 8 GB
硬盘容量 总共1.5 TB
Hadoop版本 2.5.0
Java版本 1.8.0
Winscp Winscp 5.7.5

3.2 评价指标

单词统计是典型的以文本为中心的应用,本文选择其进行本文优化算法的验证。通过对比3种MapReduce程序运行相同数据所花费的时间,来评估本文优化算法的性能。其中,运行的数据大小分别为256 GB、512 GB和1 TB,针对不同的数据集,多次运行程序,然后求取各自耗时的平均值。

3.3 实验结果及分析

1) 第一种情况:设定每批数据中前5%的数据流经CBF和哈希表。在处理数据的过程中,一个映射任务对应一个输入分片。一个HDFS块可能对应一个或多个输入分片。但是,在数据进入映射函数的时候,输入文件是按行进行读入和处理的。在实际实现中,可先读取一部分数据,让这些数据中的前5%流经CBF,然后CBF和哈希表进行交互,使得哈希表中存放高频率键。然后,再处理读取的后95%数据。在处理这些数据的时候,如果发现键在哈希表中已存在,那么哈希表中对应键的频率值就加1。反之,如果键在哈希表中不存在,则存储在环形内存缓冲区中。然后,再读取下一部分数据,让这些数据中的前5%流经CBF,然后CBF和哈希表进行交互,从而使得哈希表中的高频率键随着处理文本的不同部分而不断变化更新。重复上述过程,直到数据被处理完毕为止。

2) 第二种情况:设定每批数据中的前10%数据流经CBF和哈希表。过程与第一种情况类似,但是每次流经CBF的数据所占的比例为10%。

3) 第三种情况:设定每批数据中的前15%数据流经CBF和哈希表。过程与第一种情况类似,但是每次流经CBF的数据所占的比例为15%。

经过实验,3种不同数据占比对程序有不同影响,如图 5所示。可以看出,流经CBF和哈希表的数据占比控制在10%左右较为合理。如果占比太小(如5%),则存储在哈希表的高频率键不足以反映真实的情况;如果占比太大(如15%),则会使得CBF和哈希表不断地交换数据,从而导致更多的额外开销。故选择实验效果最好的数据占比为10%的程序与原始MapReduce程序以及文[12]优化程序进行对比。图 6为处理不同大小规模的数据时,3种MapReduce程序的运行时间对比。

图 5 流经CBF和哈希表的数据占比对本文优化程序运行时间的影响

图 6 3种MapReduce程序的运行时间对比

图 6表明,对于3种不同数据量的数据处理,本文的MapReduce程序的性能均优于原始MapReduce程序和文[12]中的程序。

图 6还可看出,当测试数据量的规模较小时,本文的MapReduce程序对性能的提高不是很明显,但是随着测试数据量的增加,本文的MapReduce程序对性能的提高变得越来越明显,相比原始MapReduce和文[12]优化程序,其性能分别平均提高17.04%和9.31%。

4 结论

本文提出一种基于动态获取高频率键的MapReduce优化算法。该算法的核心思想是在已有的FB算法的基础上,通过添加CBF和哈希表,动态实时地获得数据中的高频率键,然后动态更新哈希表中的键,从而在性能上获得较大的提高。实际测试结果表明在运行时间方面,本文算法优于原始的MapReduce和文[12]算法。

进一步的工作包括使算法自适应调整流经CBF和哈希表的数据占比,从而使数据占比更好地符合文本特点,并将算法用于更多文本应用。

参考文献
[1]
HILLS T T, NOGUCHI T, GIBBERT M. Information overload or search-amplified risk set size and order effects on decisions from experience[J]. Psychonomic Bulletin & Review, 2013, 20(5): 1023-1031.
[2]
ZIKOPOULOS P, EATON C. Understanding big data:Analytics for enterprise class Hadoop and streaming data[M]. New York: McGraw-Hill Osborne Media, 2011.
[3]
HERODOTOU H, BABU S. Profiling, what-if analysis, and cost-based optimization of mapreduce programs[J]. Proceedings of the VLDB Endowment, 2011, 4(11): 1111-1122.
[4]
FLORATOU A, PATEL J M, SHEKITA E J, et al. Column-oriented storage techniques for MapReduce[J]. Proceedings of the VLDB Endowment, 2011, 4(7): 419-429. DOI:10.14778/1988776
[5]
HE Y, LEE R, HUAI Y, et al. RCFile: A fast and space-efficient data placement structure in MapReduce-based warehouse systems[J]. 2011, 83(1): 1199-1208.
[6]
M AX, FAN X, LIU J, et al. vLocality:Revisiting data locality for MapReduce in virtualized clouds[J]. IEEE Network, 2017, 31(1): 28-35.
[7]
POLO J, CARRERA D, BECERRA Y, et al. Performance-driven task co-scheduling for mapreduce environments[C]//2010 IEEE Network Operations and Management Symposium-NOMS 2010. New York: IEEE, 2010: 373-380.
[8]
CHENG D, RAO J, GUO Y, et al. Improving performance of heterogeneous MapReduce clusters with adaptive task tuning[J]. IEEE Transactions on Parallel & Distributed Systems, 2017, 28(3): 774-786.
[9]
ZIPF G K. Selected studies of the principle of relative frequency in language[J]. Language, 1933, 9(1): 89-92. DOI:10.2307/409519
[10]
RATTANAOPAS K, KAEWKEEREE S. Improving Hadoop MapReduce performance with data compression: A study using wordcount job[C]//International Conference on Electrical Engineering/electronics, Computer, Telecommunications and Information Technology. New York: IEEE, 2017: 564-567.
[11]
ISSA J. Performance evaluation and estimation model using regression method for Hadoop wordcount[J]. IEEE Access, 2015, 3: 2784-2793. DOI:10.1109/ACCESS.2015.2509598
[12]
HSIAO C H, CAFARELLA M, NARAYANASAMY S. Reducing MapReduce abstraction costs for text-centric applications[C]//201443rd International Conference on Parallel Processing. New York: IEEE, 2014: 40-49.
[13]
NYANG D H. Counting bloom filter: U.S. Patent Application 15/021, 133[P]. 2013-10-14.
[14]
ROTTENSTREICH O, KANIZO Y, KESLASSY I. The variable increment counting Bloom filter[J]. IEEE/ACM Transactions on Networking, 2014, 22(4): 1092-1105. DOI:10.1109/TNET.2013.2272604