时东南
作者时东南2017-08-23 16:37
软件架构师, 外资银行

基于Hive和Spark的十档行情计算

字数 5002阅读 2794评论 0赞 5

摘要

十档行情指某只股票或者证券在某一时刻全市场的最低的十个卖出价格和最高的十个买入价格,每一个价格称之为一个档位。上海证券交易所市场监察部在进行数据协查和异常分析中需频繁使用个股任意时点的十档行情信息。临时还原档位行情数据的效率不高。因此,本文致力于基于Hive和Spark大数据技术,对全市场的十档行情进行还原(离线地计算出每一条记录所在时刻的前、后十档行情),从而大大提高数据协查和异常分析的速度。本文设置了多组对比实验,实验证明Spark技术在数据量大的计算速度增长较快,且证明了集群的参数对计算速度的影响非常大。另外,本文对计算过程中出现的数据倾斜进行了分析和一定程度的解决。

问题提出

档位行情是市场监管流程中的重要指标,是上交所监测用户盘中虚假申报等异常行为的计算要素之一。数据仓库依靠其分布式和分区索引的特点可以快速计算秒级档位行情。一般当业务需要计算精细到每一笔订单的档位行情时,基本上都会给定某些账户或某几只股票的条件。但是当需要计算全市场的每笔订单的档位行情时,数据仓库利用SQL通过委托、成交明细推算每笔订单前后的十档行情。其中涉及申报、成交、撤单等大数据量表关联及串行化计算等操作,耗时较长,有时可能会出现无法得到结果的情况。

为了解决多表关联耗时问题,本文利用Spark的高效内存计算在内存中进行表连接取代数据仓库中的表连接,以提升整体计算速度。对于串行化问题,基于分布式集群环境按股票代码进行并行化数据处理,极大地降低了计算时间。在实际实验中,计算及存储每条委托、成交、撤单记录前后各10档行情需43分钟(对于全市场2015年6月1号的脱敏数据为11,071 万条记录)。 本文分析了计算过程中出现的数据倾斜,通过Hive预处理和增大并行度进行了一定的优化。

1.技术简介

1.1Hive简介

Facebook为了解决海量日志数据的分析而开发了Hive——一种管理存储在分布式上的大数据集的数据仓库框架。

Hive建立在 Hadoop 上,提供了一系列的工具进行数据提取转化加载(ETL)。它定义了简单的类 SQL 查询语言,称为 HQL。它不仅允许熟悉 SQL 的用户查询数据,而且允许熟悉 map-reduce 开发者的开发自定义的 mapper 和 reducer 来处理无法完成的复杂的分析工作。

虽然Hive仍然使用SQL,但是和关系型数据库仍然存在着许多不同的地方。首先,与关系型数据库的存储文件的系统不同的是,Hive的存储文件系统是Hadoop的HDFS;其次,其计算模型是map-reduce,为海量数据挖掘而设计;再者,其具有易扩展存储能力和计算能力。本文将数据批量的导入到HDFS中,并以Hive外部表的parquet形式存储在集群中。借助Hive存储的分区性质,将数据比较均匀进行存储,方便后续读取数据和处理数据。

1.2 Spark简介

Spark是一个基于内存计算的运算的数据处理框架,提供处理大规模数据的并行分布式基础引擎,可使程序提高100倍的内存计算速度,或者10倍的磁盘计算速度。

Spark有5大核心组件:RDD、Spark Streaming、Spark SQL、MLlib和GraphX。其中,RDD是一个抽象的弹性数据集,支持基于工作集的应用,同时具有数据流模型的特点:自动容错、位置感知调度和可伸缩性。与分布式共享内存系统需要付出昂贵代价的检测点和回滚机制不同,它通过Lineage来重建丢失的分区以达到高效容错。SparkSQL可以利用SQL或者Hive的查询语法来查询数据。Spark Streaming支持实时流式数据处理。 MLlib是一个机器学习库,提供了为大规模集群计算所设计的分类、回归、聚类和协同过滤等机器学习算法。GraphX则是用于绘图和执行绘图并行计算的软件库,为ETL(探索性分析和反复的绘图计算)提供了一套统一的工具。本文通过Spark SQL从Hive表中读取数据转换为RDD,后续的档位计算是基于RDD的一系列的转换和动作操作。

2.还原十档行情

2.1 十档行情

十档行情指某只股票或者证券在某一时刻全市场的最低的十个卖出价格和最高的十个买入价格,每一个价格称之为一个档位。如下2图所示,在前一时刻股票代码为603958的股票买入十档价格从42.04到43.00,卖出的十档价格从42.15到42.42。当前时刻账户A以42.20的价格买入全部该股票,交易系统撮合成功后,成交之后更新该股票的十档行情。

1.png

1.png

2.2 算法设计

由于每只股票的档位行情是按时间顺序排序的,所以本文设计每只股票的档位行情计算串行计算,股票与股票之间并行化计算。基于上述特点设计如下数据流图,见图3。

  1. 读入某一日的全市场所有委托、成交、撤单记录。
  2. 按股票代码进行哈希分区,保证一只股票的所有记录仅存放在一个分区,各个分区并行计算档位行情。
  3. 通过一系列的转换操作后,将计算完成的档位数据写入到分区中。

2.png

2.png

其中,分区内部的串行计算档位行情算法如图4所示,

  1. 输入一条记录,根据委托、撤单、成交三种类型的记录,更新对应股票的剩余委托量。
  2. 根据剩余委托量更新对应股票买卖方向上的价格链表。
  3. 取得买链表最大的10个价格和卖链表上最小的10个价格写入该记录的10档行情字段中。

档位行情是由订单价格产生,而成交记录并没有存储股票在对应订单号下的订单价格。通常是采用成交表和委托表连接的方式获得。两张大表连接是非常耗时的,算法中通过维持一张哈希映射表解决表连接问题。具体做法是:通过在处理每一条委托记录时将委托价格字段存入哈希表中,在处理后续的成交记录时,可以根据股票代码、委托编号、买卖方向组成的主键,从哈希表中直接取出对应的委托价格。

3.png

3.png

上图是档位行情计算流程图(图3中Compute的流程图)(注:hash_stock_price_quantity:存放每只股票买、卖价格上的剩余委托量;hash_stock_pricelist:存放每只股票当前记录所在时刻的所有价格列表;hash_stock_ordno_price:存放每只股票订单号下的委托记录下的买、卖的价格)

3.实验设计

3.1环境参数

本文使用两个集群环境,如下表1所示。其中使用的Spark运行环境集群1参数如下表1所示。6台PC服务器(DL380,32核,128GB内存,1TB本地硬盘)构成的千兆网络集群。操作系统为64位RHEL 6.3,安装CDH 5.4.5(Hadoop 2.6.0),包含Hive 1.1.0,安装Spark 1.6.1,共用同一套HDFS文件系统。Spark采用YARN-Client集群模式。HDFS副本数配置为2。集群2与集群1除了表中的参数不同外,安装的CDH为5.7.1(Hadoop 2.6.4),包含Hive1.1.0,安装Spark1.6.0。

4.png

4.png

本文使用的数据源共两个,分为脱敏数据和模拟数据。其中脱敏数据的统计信息如表2所示。数据源为2015年6月1日上海证券市场的脱敏交易数据。全天有3,585万条订单记录,6,157万条成交记录和1,329万条撤单记录。其中记录长635字节,总共17G数据量。

5.png

5.png

模拟数据的统计信息如表3所示。数据源是根据一定的业务规则通过Python编程产生的模拟上海证券市场一天的交易数据。

6.png

6.png

3.2数据加载

交易的文本数据通过两步数据加载进入到Hive并以表的形式存储。第一步是文件加载,通过 hdfs dfs -put 命令,将日志文件加载到HDFS文件系统中,在这一过程中,数据被分块后通过网络进行分布式存储。第二步是表加载,将日志文件映射为外部表,通过Hive的insert overwrite table指令,将交易记录转换为关系表,以Parquet列式压缩格式存入HDFS文件系统。

3.3对比实验

3.3.1不同数据量的计算性能对比

本实验采用随机从脱敏数据中选取不同数据量的数据进行计算。该实验的计算集群是集群1。不同数据计算性能变化的结果如下表所示:

7.png

7.png

8.png
8.png

从图5所示可以看出,使用脱敏数据集时,随着记录的增加,计算时间2~4倍的时间增加。这说明不同的数据集会影响Spark的计算性能。当数据量在越来越大时,脱敏数据集的计算速度的优势逐渐显现。综上说明数据集分布对spark的计算性能存在很大影响。

3.3.2不同集群计算性能变化情况

本实验采用随机从模拟数据中选取不同数据量的数据进行计算。该实验的计算集群是集群1和集群2。

9.png

9.png

10.png
10.png

从图6所示可以看出,同等数据量时,集群1的计算速度比集群2的计算速度有明显的优势。随着数据量的增大,集群1优势更加明显(集群2在数据量达到5千万时,任务直接失败)。这说明集群的各项硬件指标如机器数、内存和核数会影响Spark在集群中的计算性能。

3.4全市场一天脱敏数据实验

从上文的两个对比实验中,本文得出结论Spark的计算性能会随着数据量和集群的配置的不同而不同。Spark在数据量比较大和集群配置的内存、核数等比较大时性能优势更加明显。因此,在计算上海证券市场2015年6月1号全市场的数据的档位行情时,本实验选择在集群1上进行计算(其中,相关的参数设置如下表7所示)。结果如下表6所示。

11.png

11.png

12.png
12.png

实验结果如上表6所示,Spark集群在处理11,071万条数据的情况下,平均1万条记录的处理时间是0.23秒,并且每只股票的处理时间是2.78秒。

4.数据倾斜问题

在计算过程中,本文对任务执行过程中出现的数据倾斜问题进行了分析。

13.png

13.png

本文使用yarn-client模式提交,从本地的log中找到计算任务共运行了几个阶段(stage),如下图7所示,任务的阶段4(stage4)和阶段5(stage5)。

首先查看了两个阶段所用的时间如下图8所示,阶段4用了5分钟,阶段5用了57分钟,即任务的大部分时间都在花在了阶段 5上。

14.png

14.png

因此,进入到阶段5中查看各个执行者(Executor)的运行时间(图9)。明显可以看到,有的任务运行特别快,只需要37秒钟就运行完;而有的任务运行特别慢,需要56分钟才运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。此外,从图中发现,运行时间特别短的任务只需要处理几千KB的数据,而运行时间特别长的任务需要处理几百MB的数据,处理的数据量差了100倍。此时能够确定在ReduceByKey阶段发生了数据倾斜。

15.png

15.png

16.png
16.png

确定数据发生倾斜之后,本文实验是根据股票代码进行哈希分布的,所以首先根据股票代码统计了每只股票的记录数如图10所示。其中图10统计了所有股票的记录数并进行排序,可以看出数据分布确实存在倾斜。其中某些股票的记录数达到百万以上,甚至其中一只股票的成交、申报和撤单记录总数近480万条。

由于上述的数据倾斜导致ReduceByKey过程中的Shuffle算子分配到个别的某些任务的数据过多,导致该任务的执行时间过长,从而导致整个计算的时间变长。本文首先使用Hive进行预处理使得存在Hive中的表的数据是比较均匀的。但是由于在ReduceByKey的时候仍然是根据股票代码进行哈希的,所以效果不是很明显。其次,本文将shuffle操作(RDD在执行ReduceByKey操作中执行了shuffle算子)的并行度增加到400缓解数据倾斜问题。经过上述两步之后,计算速度得到了一定的提升,最终的计算时间43.2分钟。

5.总结和展望

根据实验结果可发现Spark方法的计算性能得到大幅提高,而且当数据量越来越大时,性能优势更加明显。未来考虑将上述行情10档计算开发为相关数据产品,如历史行情档位回放、历史10档行情切片查询等数据产品,服务于市场投资分析用户,进一步运用Spark技术发挥数据的市场价值。Spark除了支持快速进行离线批量处理,Spark 中的Spark Streaming实时处理技术也可对档位行情进行在线实时处理,后续计划结合大数据平台暨新监察系统技术咨询项目验证Spark Streaming实时处理委托数据生成10档行情切片的能力。

另外,在Spark的分布式计算中数据倾斜是一个比较棘手的问题,未来我们将探索从数据结构方面解决数据倾斜问题。

如果觉得我的文章对您有用,请点赞。您的支持将鼓励我继续创作!

5

添加新评论0 条评论

Ctrl+Enter 发表

作者其他文章

相关问题

相关资料

X社区推广