第12卷第5期 智能系统学报 Vol.12 No.5 2017年10月 CAAI Transactions on Intelligent Systems 0ct.2017 D0I:10.11992/is.201706016 网络出版地址:htp:/kns.cmki.net/kcms/detail/23.1538.TP.20171021.1350.014.html 基于SQL-on-Hadoop查询引擎的日志挖掘及其应用 何明,常盟盟,刘郭洋2,顾程祥2,彭继克2 (1.北京工业大学信息学部,北京100124:2.海通证券股份有限公司信息技术管理部,上海200001) 摘要:随着计算机和网络技术的迅猛发展以及数据获取手段的不断丰富,海量数据的实时处理需求日益增多,传 统的日志分析技术在处理海量数据时存在计算瓶颈。大数据时代下,随着开放式处理平台的发展,能够处理大规模 且多样化数据的大数据处理系统应运而生。为了让原有的业务能够充分利用Hop的优势,本文首先研究了基于 大数据技术的网络日志分析方法,构建了网络日志分析平台以实现万亿级日志采集、解析、存储和高效、灵活的查询 与计算。对比分析了Hiwe、Impala和Spark SOL这3种具有代表性的SQL-om-Hadoop查询系统实例.并展示了这类系 统的性能特点。采用TPCH测试基准对它们的决策支持能力进行测试及评估,通过对实验数据的分析和解释得到了 若干有益的结论。实现了海量日志数据计算与分析在证券领域的几种典型应用,为进一步的研究工作奠定了基础。 关键词:大数据;日志分析:数据挖掘;Hadoop:查询引擎;数据采集:索引存储;证券行业 中图分类号:TP391文献标志码:A文章编号:1673-4785(2017)05-0717-12 中文引用格式:何明,常盟盟,刘郭洋,等.基于SQL-on-Hadoop查询引擎的日志挖掘及其应用[J].智能系统学报,2017,12(5): 717-728. 英文引用格式:HE Ming,CHANG Mengmeng,LIU Guoyang,etal.Log mining and application based on sql-on-hadoop query engine[J].CAAI transactions on intelligent systems,2017,12(5):717-728. Log mining and application based on sql-on-hadoop query engine HE Ming',CHANG Mengmeng',LIU Guoyang?,GU Chengxiang?,PENG Jike2 (1.Faculty of Information Technology,Beijing University of Technology,Beijing 100124,China;2.Information Technology Management Department,Haitong Securities Co.,Ltd.,Shanghai 200001,China) Abstract:With the rapid development of computing and networking technologies,and the increase in the number of data acquisition methods,the demand for real-time processing of massive amounts of log data is increasing every day,and there is a calculation bottleneck when traditional log analysis technology is used to process massive amounts of data.With the development of open processing platforms in the era of big data,a number of big data processing systems have emerged for dealing with large-scale and diverse data.To effectively apply the advantages of Hadoop to the original businesses,in this study,we first investigated network log analysis methods based on big data technology and constructed a network log analysis platform for the acquisition,analysis,storage,high- efficiency and flexible queries,and the calculation of trillions of log entries.In addition,we compared and analyzed three representative SQL-on-Hadoop query systems including Hive,Impala,and Spark SQL,and identified the performance characteristics of this type of system.We used the TPC-H testing reference to test and assess their decision-making support abilities.We drew some useful conclusions from the analysis of the experimental data.We also suggest a few typical applications for this analysis and processing system for massive log data in the securities fields,which provides a solid foundation for further research. Keywords:big data;log analysis;data mining;Hadoop;query engine;data collection;indexed storage; securities business 随着互联网的飞速发展和逐层推进,企业内部 业网络中的计算机设备和网络组件持久地记录着 的规模和业务量也不断增加,致使数据量猛增。企 海量的网络日志。日志文件是系统软硬件信息和 用户行为信息记录的载体,通过日志分析能够实时 收稿日期:2017-06-07.网络出版日期:2017-10-21. 基金项目:国家自然科学基金项目(91646201.91546111,60803086):国家科 获取设备、网络运行状态和用户行为交易等信息, 技支撑计划子课题(2013BAH2IB02-01);北京市自然科学基金 项目(4153058,4113076):北京市教委重点项目有利于保证系统的稳定运行和来往业务的安全性。 (KZ20160005009):北京市教委面上项目(KM201710005023). 通信作者:何明.E-mail:heming(@bjut.cdu.cn. 目前,较为成熟的日志集中管理系统解决了各类设
第 12 卷第 5 期 智 能 系 统 学 报 Vol.12 №.5 2017 年 10 月 CAAI Transactions on Intelligent Systems Oct. 2017 DOI:10.11992 / tis.201706016 网络出版地址:http: / / kns.cnki.net / kcms/ detail / 23.1538.TP.20171021.1350.014.html 基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用 何明1 ,常盟盟1 ,刘郭洋2 ,顾程祥2 ,彭继克2 (1.北京工业大学 信息学部,北京 100124; 2. 海通证券股份有限公司 信息技术管理部,上海 200001) 摘 要:随着计算机和网络技术的迅猛发展以及数据获取手段的不断丰富,海量数据的实时处理需求日益增多,传 统的日志分析技术在处理海量数据时存在计算瓶颈。 大数据时代下,随着开放式处理平台的发展,能够处理大规模 且多样化数据的大数据处理系统应运而生。 为了让原有的业务能够充分利用 Hadoop 的优势,本文首先研究了基于 大数据技术的网络日志分析方法,构建了网络日志分析平台以实现万亿级日志采集、解析、存储和高效、灵活的查询 与计算。 对比分析了 Hive、Impala 和 Spark SQL 这 3 种具有代表性的 SQL⁃on⁃Hadoop 查询系统实例,并展示了这类系 统的性能特点。 采用 TPC⁃H 测试基准对它们的决策支持能力进行测试及评估,通过对实验数据的分析和解释得到了 若干有益的结论。 实现了海量日志数据计算与分析在证券领域的几种典型应用,为进一步的研究工作奠定了基础。 关键词:大数据;日志分析;数据挖掘;Hadoop;查询引擎;数据采集;索引存储;证券行业 中图分类号:TP391 文献标志码:A 文章编号:1673-4785(2017)05-0717-12 中文引用格式:何明,常盟盟,刘郭洋,等. 基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用[ J]. 智能系统学报, 2017, 12(5): 717-728. 英文引用格式: HE Ming,CHANG Mengmeng,LIU Guoyang, et al. Log mining and application based on sql⁃on⁃hadoop query engine[J]. CAAI transactions on intelligent systems, 2017, 12(5): 717-728. Log mining and application based on sql⁃on⁃hadoop query engine HE Ming 1 , CHANG Mengmeng 1 , LIU Guoyang 2 , GU Chengxiang 2 , PENG Jike 2 (1. Faculty of Information Technology, Beijing University of Technology, Beijing 100124, China; 2. Information Technology Management Department, Haitong Securities Co., Ltd., Shanghai 200001, China) Abstract:With the rapid development of computing and networking technologies, and the increase in the number of data acquisition methods, the demand for real⁃time processing of massive amounts of log data is increasing every day, and there is a calculation bottleneck when traditional log analysis technology is used to process massive amounts of data. With the development of open processing platforms in the era of big data, a number of big data processing systems have emerged for dealing with large⁃scale and diverse data. To effectively apply the advantages of Hadoop to the original businesses, in this study, we first investigated network log analysis methods based on big data technology and constructed a network log analysis platform for the acquisition, analysis, storage, high⁃ efficiency and flexible queries, and the calculation of trillions of log entries. In addition, we compared and analyzed three representative SQL⁃on⁃Hadoop query systems including Hive, Impala, and Spark SQL, and identified the performance characteristics of this type of system. We used the TPC⁃H testing reference to test and assess their decision⁃making support abilities. We drew some useful conclusions from the analysis of the experimental data. We also suggest a few typical applications for this analysis and processing system for massive log data in the securities fields, which provides a solid foundation for further research. Keywords: big data; log analysis; data mining; Hadoop; query engine; data collection; indexed storage; securities business 收稿日期:2017-06-07. 网络出版日期:2017-10-21. 基金项目:国家自然科学基金项目(91646201, 91546111, 60803086); 国家科 技支撑计划子课题(2013BAH21B02-01); 北京市自然科学基金 项 目 ( 4153058, 4113076 ); 北 京 市 教 委 重 点 项 目 (KZ20160005009); 北京市教委面上项目(KM201710005023). 通信作者:何明. E⁃mail:heming@ bjut.edu.cn. 随着互联网的飞速发展和逐层推进,企业内部 的规模和业务量也不断增加,致使数据量猛增。 企 业网络中的计算机设备和网络组件持久地记录着 海量的网络日志。 日志文件是系统软硬件信息和 用户行为信息记录的载体,通过日志分析能够实时 获取设备、网络运行状态和用户行为交易等信息, 有利于保证系统的稳定运行和来往业务的安全性。 目前,较为成熟的日志集中管理系统解决了各类设
·718· 智能系统学报 第12卷 备、服务器和应用日志的采集与格式统一问题,日 得到了若干有益的结论: 志分析也从最初简单的正则匹配向结构化查询、报 3)实现了大规模网络日志数据分析与计算在证 表和预测演进山。越来越多的行业领域面临海量 券领域的几种典型应用。 (volume)、高速(velocity)和多样(variety)等多V挑 1 相关工作 战,大数据时代已真正到来[2-)。 互联网中海量的信息为证券领域日志分析提 大数据技术在互联网领域海量网络日志分析和 供了丰富的数据支撑,如何利用大数据分析技术进 处理过程中得到了广泛的应用,日志分析系统主要 行实时准确的日志分析成为重要的科学问题。在 包括日志同步、数据存储、分布式计算和数据仓库等 大型证券公司的内部网络中,随着网络带宽的迅速 相关技术。开源的日志分析系统如Facebook的 扩容日志量急剧增长且日志源众多,包括网上交易 Scribet6],Apache Chukwat7],LinkedIn Kafkats], 日志、移动证券日志和网站日志等主要系统的日 Cloudera的Flume)等。Facebook公司庞大的用户群 志。以海通证券为例,目前在全国设有几十个节 体产生了大量的信息与社交数据,现有8亿多用户 点,几百台服务器,峰值在线用户约几十万,每个节 的信息需要处理,产生了大规模的数据和日志:同 点各部署了1台负载均衡设备。网上交易应用服务 时,离线的大规模数据分析计算已无法满足实时数 器全天24小时将客户请求数据与应答数据实时或 据分析的用户需求,Scribe结合了Google的分布式 小批量定时写入磁盘日志文件,每台交易应用服务 文件系统GFS[10](google file system,GFS)。操作流 器的日志文件大小为100MB~3GB,总计在100GB 程是收集异构数据源上的日志,集中存储到分布式 左右。同时,每台网上交易应用服务器还会生成一 文件系统,从而在此基础上进行统计分析。Amazon 份发送给柜台程序的网关日志数据。此外,各节点 基于S3和EC2,开发了Amazon EMR来提供大数据 负载均衡设备的日志采用SNMP协议进行采集,采 处理服务,可以将数据分布在可重新调整大小的 集每个站点的网络流量、用户连接数据。每日合计 EC2集群中进行处理,包括日志分析、索引、数据仓 有3亿多条日志,总量共计约300GB。仅上述3类 库和机器学习等。阿里巴巴集团使用目前国内最 日志存储一年就将产生约108TB数据,若接入更多 大的Hadoop集群“云梯”进行各部门产品的线上数 设备、操作系统、业务平台日志,数据规模则更大。 据备份、系统日志以及爬虫数据分析,并建设开放 传统的日志处理方法在面对海量大数据时,其存储 平台为个人和企业提供各种增值服务。腾讯微信 方式和计算能力都受到了限制,因此分布式存储和 等应用产品拥有上亿级别的用户,产生了海量的个 并行计算成为了新的发展趋势。如何采集、传输、 人用户日志数据,这些数据中蕴藏着巨大的商业价 存储、分析及应用大规模的日志数据,已成为证券 值,并提出“大数据营销”的概念。人人网基于 行业在大数据时代下面临的重大挑战。 Hadoop的Hive)、HBase[12]和Streamingl]组件, Hadoop)分布式处理平台为大数据存储和分析 构建了SNS推荐平台进行分析计算、内容推荐等工 提供了有效的解决方案。在大数据应用方面,虽然 作。百度的高性能计算系统规划中的架构将有超 学术界和工业界对大数据的关注各有侧重,但有一 过1万个节点,每天的数据生成量在10PB以上,主 个共同的认识:大数据只有和具体的行业深入结合 要用于日志的存储分析以及统计挖掘等功能。Wi 才能落到实处,才能产生真正的价值。通过前期的 等设计了Analysis Farm摒弃了传统的关系型数据 积累和算法的升级,大数据应用将对证券行业产生 (relational database management system,RDBMS), 革命性影响。 利用NoSQL(not only SQL)数据库MongoDB构建了 本文的主要贡献如下: 可横向扩展的日志分析平台,以支撑NetFlow日志 1)研究基于SQL-on-Hadoop查询系统的性能特 存储和查询4。Rabkin等设计了基于Hadoop的日 点,对比分析了Hive、Impala和Spark SQL这3种具 志收集和分析系统Chukwa,日志处理程序在 有代表性的SQL-on-Hadoop查询系统实例,构建了 MapReduce框架上开发。文献[l6-17]从原位 海量日志采集与实时计算分析平台: 分析的角度出发,分别实现了针对大规模日志分析 2)采用TPC-H测试基准对它们的决策支持能 的MapReduce(In-situ MapReduce)和Continuous处 力进行测试及评估,通过对实验数据的分析和解释 理机制,但MapReduce模型计算代价很大,并不能
备、服务器和应用日志的采集与格式统一问题,日 志分析也从最初简单的正则匹配向结构化查询、报 表和预测演进[1] 。 越来越多的行业领域面临海量 (volume)、高速(velocity)和多样(variety)等多 V 挑 战,大数据时代已真正到来[2-4] 。 互联网中海量的信息为证券领域日志分析提 供了丰富的数据支撑,如何利用大数据分析技术进 行实时准确的日志分析成为重要的科学问题。 在 大型证券公司的内部网络中,随着网络带宽的迅速 扩容日志量急剧增长且日志源众多,包括网上交易 日志、移动证券日志和网站日志等主要系统的日 志。 以海通证券为例,目前在全国设有几十个节 点,几百台服务器,峰值在线用户约几十万,每个节 点各部署了 1 台负载均衡设备。 网上交易应用服务 器全天 24 小时将客户请求数据与应答数据实时或 小批量定时写入磁盘日志文件,每台交易应用服务 器的日志文件大小为 100 MB~3 GB,总计在 100 GB 左右。 同时,每台网上交易应用服务器还会生成一 份发送给柜台程序的网关日志数据。 此外,各节点 负载均衡设备的日志采用 SNMP 协议进行采集,采 集每个站点的网络流量、用户连接数据。 每日合计 有 3 亿多条日志,总量共计约 300 GB。 仅上述 3 类 日志存储一年就将产生约 108 TB 数据,若接入更多 设备、操作系统、业务平台日志,数据规模则更大。 传统的日志处理方法在面对海量大数据时,其存储 方式和计算能力都受到了限制,因此分布式存储和 并行计算成为了新的发展趋势。 如何采集、传输、 存储、分析及应用大规模的日志数据,已成为证券 行业在大数据时代下面临的重大挑战。 Hadoop [5]分布式处理平台为大数据存储和分析 提供了有效的解决方案。 在大数据应用方面,虽然 学术界和工业界对大数据的关注各有侧重,但有一 个共同的认识:大数据只有和具体的行业深入结合 才能落到实处,才能产生真正的价值。 通过前期的 积累和算法的升级,大数据应用将对证券行业产生 革命性影响。 本文的主要贡献如下: 1)研究基于 SQL⁃on⁃Hadoop 查询系统的性能特 点,对比分析了 Hive、Impala 和 Spark SQL 这 3 种具 有代表性的 SQL⁃on⁃Hadoop 查询系统实例,构建了 海量日志采集与实时计算分析平台; 2)采用 TPC⁃H 测试基准对它们的决策支持能 力进行测试及评估,通过对实验数据的分析和解释 得到了若干有益的结论; 3)实现了大规模网络日志数据分析与计算在证 券领域的几种典型应用。 1 相关工作 大数据技术在互联网领域海量网络日志分析和 处理过程中得到了广泛的应用,日志分析系统主要 包括日志同步、数据存储、分布式计算和数据仓库等 相关 技 术。 开 源 的 日 志 分 析 系 统 如 Facebook 的 Scribe [6] ,Apache 的 Chukwa [7] , LinkedIn 的 Kafka [8] , Cloudera 的 Flume [9]等。 Facebook 公司庞大的用户群 体产生了大量的信息与社交数据,现有 8 亿多用户 的信息需要处理,产生了大规模的数据和日志;同 时,离线的大规模数据分析计算已无法满足实时数 据分析的用户需求, Scribe 结合了 Google 的分布式 文件系统 GFS [10] (google file system,GFS)。 操作流 程是收集异构数据源上的日志,集中存储到分布式 文件系统,从而在此基础上进行统计分析。 Amazon 基于 S3 和 EC2,开发了 Amazon EMR 来提供大数据 处理服务,可以将数据分布在可重新调整大小的 EC2 集群中进行处理,包括日志分析、索引、数据仓 库和机器学习等。 阿里巴巴集团使用目前国内最 大的 Hadoop 集群“云梯”进行各部门产品的线上数 据备份、系统日志以及爬虫数据分析,并建设开放 平台为个人和企业提供各种增值服务。 腾讯微信 等应用产品拥有上亿级别的用户,产生了海量的个 人用户日志数据,这些数据中蕴藏着巨大的商业价 值,并提出 “ 大 数 据 营 销” 的 概 念。 人 人 网 基 于 Hadoop 的 Hive [11] 、HBase [12] 和 Streaming [13] 组件, 构建了 SNS 推荐平台进行分析计算、内容推荐等工 作。 百度的高性能计算系统规划中的架构将有超 过 1 万个节点,每天的数据生成量在 10 PB 以上,主 要用于日志的存储分析以及统计挖掘等功能。 Wei 等设计了 Analysis Farm 摒弃了传统的关系型数据 库(relational database management system,RDBMS), 利用 NoSQL(not only SQL)数据库 MongoDB 构建了 可横向扩展的日志分析平台,以支撑 NetFlow 日志 存储和查询[14] 。 Rabkin 等设计了基于 Hadoop 的日 志收 集 和 分 析 系 统 Chukwa, 日 志 处 理 程 序 在 MapReduce 框架上开发[15] 。 文献[ 16 - 17] 从原位 分析的角度出发,分别实现了针对大规模日志分析 的 MapReduce( In⁃situ MapReduce) 和 Continuous 处 理机制, 但 MapReduce 模型计算代价很大,并不能 ·718· 智 能 系 统 学 报 第 12 卷
第5期 何明,等:基于SQL-on-Hadoop查询引擎的日志挖掘及其应用 ·719· 很好地支持迭代运算。 主要分为文本数据、数据库数据和实时/准实时数 然而HDFS1]和MapReducet]大数据处理架构 据等。 主要是针对静态数据的批处理,在运算过程中产生 2.1HDS数据采集 的大量/O操作无法保证处理过程的实时性。针对 网络日志的生成是分布式的,与传统的日志管 上述问题,本文将研究基于SQL-on-Hadoop查询引 理系统一样,日志采集是本文平台的基础。本文平 擎构建网络日志分析平台,通过使用广泛的标准 台采集的日志直接存储在Hadoop文件系统 SQL语言来实现快速、灵活的查询性能。通过利用 (HDFS)中,由于平台构建于Hadoop之上,能够处 TB级日志数据对存储、查询性能进行测试、优化和 理海量分布式存储的日志数据,同时易于水平扩 比较,构建具有稳定性、高性能、可扩展性、易用性 展,本文的日志数据基本流程按功能可划分为5层, 和安全性的网络日志统一采集查询和监控平台,以 如图1所示。 满足对TB或PB级容量和万亿日志管理的应用需 1)原始数据层:业务上完成日志格式梳理,系统 求,为面向证券行业的日志大数据分析及其应用提 运行日志支持实时访问和采集接口。 供技术支撑。 2)数据采集层:主要负责通用的日志数据解析 2基于Hadoop的结构化数据处理 高效采集和安全可控。 3)数据处理层:主要包括对日志数据的批量式 网络日志源的种类具有多样性的特点,包括结 处理和实时处理。 构化、半结构化和非结构化的数据。不同类型的日 4)数据服务层:主要提供标准的数据访问接口 志存储方式有所不同。日志管理系统的采集器对 ODBC、JDBC、HIVE等。 不同格式的日志进行标准化处理,从而以结构化的 5)数据展示层:实现实时监控类和报表类数据 形式进行日志存储和分析。本文所采用的源数据 的展示。 数据处理 原始数据层 存储后处理 External Database HDFS Map/Reduce 数据服务层 :数据展示层 ”””””” ODBC Statistics Report Textfile 数据采集层 JDBC 实时处理 HIVE OLAP Network Sprak Data Cache Streaming 图1日志数据处理基本流程 Fig.1 Basic log data processing framework 根据应用需求,本文日志的采集方式分为以下on-Hadoop处理结果输出到RDBMS,供现有的日志 3种。 分析系统进行报表及可视化处理。 1)文件导人:对已分布在个服务器磁盘的日志 2.2SQL-on-Hadoop查询引擎 文件,经网络文件系统挂载,直接将日志文件导入 SQL是结构化数据的查询语言,SQL-on-Hadoop HDFS。该方式允许日志文件批量可靠导入,可在网 是构建在Hadoop之上的SQL查询系统,利用 络利用率低谷时段进行传送。 Hadoop能够进行海量数据(TB级别以上)的处理。 2)流数据导入:基于Apache Flume[2o]构建,实 目前已有的SQL-on-Hadoop系统大致可以分为两大 类:第一类将SQL查询转换为Map-Reduce job;第二 现多个日志源数据实时汇聚,接收网上交易应用服 类系统基于MPP(massively parallel processing)的设 务器和网络设备发送的日志。 计方式,仅仅使用Hadoop作为存储引擎,上层自行 3)RDBMS导人:为实现与现有日志系统兼容, 实现分布式查询的逻辑。第一类系统的代表是 基于Apache Sqoop2,实现与Oracle、MSQL和 Facebook的Hive。Hive是原始的SQL-on-Hadoop解 PostgreSQL等RDBMS对接,支持直接导入存储在上 决方案。它是一个开源的Java项目,能够将SQL转 述数据库中的数据记录。Sqoop同时可以将SQL 换成一系列可以在标准的Hadoop TaskTrackers上运
很好地支持迭代运算。 然而 HDFS [18]和 MapReduce [19]大数据处理架构 主要是针对静态数据的批处理,在运算过程中产生 的大量 I/ O 操作无法保证处理过程的实时性。 针对 上述问题,本文将研究基于 SQL⁃on⁃Hadoop 查询引 擎构建网络日志分析平台,通过使用广泛的标准 SQL 语言来实现快速、灵活的查询性能。 通过利用 TB 级日志数据对存储、查询性能进行测试、优化和 比较,构建具有稳定性、高性能、可扩展性、易用性 和安全性的网络日志统一采集查询和监控平台,以 满足对 TB 或 PB 级容量和万亿日志管理的应用需 求,为面向证券行业的日志大数据分析及其应用提 供技术支撑。 2 基于 Hadoop 的结构化数据处理 网络日志源的种类具有多样性的特点,包括结 构化、半结构化和非结构化的数据。 不同类型的日 志存储方式有所不同。 日志管理系统的采集器对 不同格式的日志进行标准化处理,从而以结构化的 形式进行日志存储和分析。 本文所采用的源数据 主要分为文本数据、数据库数据和实时/ 准实时数 据等。 2.1 HDFS 数据采集 网络日志的生成是分布式的,与传统的日志管 理系统一样,日志采集是本文平台的基础。 本文平 台采 集 的 日 志 直 接 存 储 在 Hadoop 文 件 系 统 (HDFS)中,由于平台构建于 Hadoop 之上,能够处 理海量分布式存储的日志数据,同时易于水平扩 展,本文的日志数据基本流程按功能可划分为 5 层, 如图 1 所示。 1)原始数据层:业务上完成日志格式梳理,系统 运行日志支持实时访问和采集接口。 2)数据采集层:主要负责通用的日志数据解析、 高效采集和安全可控。 3)数据处理层:主要包括对日志数据的批量式 处理和实时处理。 4)数据服务层:主要提供标准的数据访问接口 ODBC、JDBC、HIVE 等。 5)数据展示层:实现实时监控类和报表类数据 的展示。 图 1 日志数据处理基本流程 Fig.1 Basic log data processing framework 根据应用需求,本文日志的采集方式分为以下 3 种。 1)文件导入:对已分布在个服务器磁盘的日志 文件,经网络文件系统挂载,直接将日志文件导入 HDFS。 该方式允许日志文件批量可靠导入,可在网 络利用率低谷时段进行传送。 2)流数据导入:基于 Apache Flume [20] 构建,实 现多个日志源数据实时汇聚,接收网上交易应用服 务器和网络设备发送的日志。 3)RDBMS 导入:为实现与现有日志系统兼容, 基于 Apache Sqoop [21] , 实 现 与 Oracle、 MySQL 和 PostgreSQL 等 RDBMS 对接,支持直接导入存储在上 述数据库中的数据记录。 Sqoop 同时可以将 SQL⁃ on⁃Hadoop 处理结果输出到 RDBMS,供现有的日志 分析系统进行报表及可视化处理。 2.2 SQL⁃on⁃Hadoop 查询引擎 SQL 是结构化数据的查询语言,SQL⁃on⁃Hadoop 是构 建 在 Hadoop 之 上 的 SQL 查 询 系 统, 利 用 Hadoop 能够进行海量数据( TB 级别以上)的处理。 目前已有的 SQL⁃on⁃Hadoop 系统大致可以分为两大 类:第一类将 SQL 查询转换为 Map⁃Reduce job;第二 类系统基于 MPP(massively parallel processing)的设 计方式,仅仅使用 Hadoop 作为存储引擎,上层自行 实现分布式查询的逻辑。 第一类系统的代表是 Facebook 的 Hive。 Hive 是原始的 SQL⁃on⁃Hadoop 解 决方案。 它是一个开源的 Java 项目,能够将 SQL 转 换成一系列可以在标准的 Hadoop TaskTrackers 上运 第 5 期 何明,等:基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用 ·719·
·720 智能系统学报 第12卷 行的MapReduce任务。如图2中的Hive架构部分 Impala并没有使用MapReduce执行查询,而是使用 所示,Hive通过一个metastore(本身就是一个数据 了自己的执行守护进程操作本地磁盘文件。由于 库)存储表模式、分区和位置以期提供像MySQL一 没有MapReduce开销以及磁盘/O、查询语句编译 样的功能。它支持大部分MySQL语法,同时使用相 等一系列优化,Impala通常要比Hive具有更快的数 似的database/able/view约定组织数据集。Hive内 据访问性能[s]。Impala共享Hive的metastore,可直 部机制是基于MapReduce,从而导致了计算过程中 接与Hive管理的数据互操作。Spark[24]使用轻量级 消耗大量的/0,降低了运行效率。Impala2]是由 的线程作为执行器,减少了执行作业的开销,同时 Cloudera构建的一个针对Hadoop的开源的MPP 提高了调度的响应速度,如图2中的Spark部分所 (massively parallel processing)“交互式”SQL查询引 示。Spark SQL是在Spark之上搭建的SQL查询引 擎。Impala同样提供了一种SQL查询方法,如图2 擎,支持在Spark中使用Sql、HiveSql、Scala中的关 中的Impala架构部分所示,与Hive不同的是, 系型查询表达式。 Impala Hive SQL APP JDBC Statestore Catalog CLI JDBC/ODBC HUE ODBC Impalad Impalad Impalad Query Planner Query Planner Query Planner Thrift Server MetaStore Query Coordinator Query Coordinator Query Coordinator Query Executor Query Executor Query Executor Compiler Optimizer Executor Hadoop Spark Standalone Job Tracker Name Node Spark Context Spark Master 1 Data Node Data Node Data Node Spark Worker Spark Worker Spark Worker 'ask Tracker Task Tracker Task Tracker kecutor Backenc Executor Backend Executor Backend 图2 Hadoop、Hive、Impala与Spark执行结构图 Fig.2 Structure for implementation of Hadoop,Hive,Impala and Spark 2.3结构化数据存储与压缩 多种格式的数据格式的支持。Text是原始的文本数 目前,很多研究者提出了在Hadoop中优化结构 据,通常为CSV或其他特定字符分隔。Hive的格式 化数据存储的方法。He等[2]提出的RCFile格式旨 支持更为全面,由于Impala和Hive共享metastore, 在提高数据导人和处理效率。它首先将数据水平 因此本文平台实际应用中通常由Hive导入数据而 分割为多个行组(ov-group),然后对每个组内的数 后台使用Spark SQL查询。 据垂直分割成列存储。列存储将数据表同一列的 表1Hive、Impala和Spark SOL数据格式支持比较 数据连续存放,当查询只涉及部分列时,可大幅减 Table 1 Data format comparison of Hive,Impala and 少所需读取的数据量。ORC(optimized RCFile)是对 Spark SQL RCFile的改进,解决其在数据类型和性能上的多个 数据 Hive Impala Spark SQL 局限性,改善查询和空间利用效率。Parquet是 格式 查询插入 查询插人 查询 插入 Hadoop生态圈中一种新型列式存储格式,灵感来自 于2010年Google发表的Dremel论文[2],它可以兼 Text 容Hadoop生态圈中大多数生态框架(Hadoop、Spark RCFile 等),被多种查询引擎支持(Hive、Impala、Spark SQL、 ORC Dill等),并且它与语言和平台无关的。表1比较 了本文2.2节描述的3种查询引擎从HDFS上读取 Parquet
行的 MapReduce 任务。 如图 2 中的 Hive 架构部分 所示,Hive 通过一个 metastore(本身就是一个数据 库)存储表模式、分区和位置以期提供像 MySQL 一 样的功能。 它支持大部分 MySQL 语法,同时使用相 似的 database / table / view 约定组织数据集。 Hive 内 部机制是基于 MapReduce,从而导致了计算过程中 消耗大量的 I/ O,降低了运行效率。 Impala [22] 是由 Cloudera 构建的一个针对 Hadoop 的开源的 MPP (massively parallel processing)“交互式” SQL 查询引 擎。 Impala 同样提供了一种 SQL 查询方法,如图 2 中的 Impala 架 构 部 分 所 示, 与 Hive 不 同 的 是, Impala 并没有使用 MapReduce 执行查询,而是使用 了自己的执行守护进程操作本地磁盘文件。 由于 没有 MapReduce 开销以及磁盘 I/ O、查询语句编译 等一系列优化,Impala 通常要比 Hive 具有更快的数 据访问性能[23] 。 Impala 共享 Hive 的 metastore,可直 接与 Hive 管理的数据互操作。 Spark [24]使用轻量级 的线程作为执行器,减少了执行作业的开销,同时 提高了调度的响应速度,如图 2 中的 Spark 部分所 示。 Spark SQL 是在 Spark 之上搭建的 SQL 查询引 擎,支持在 Spark 中使用 Sql、HiveSql、Scala 中的关 系型查询表达式。 图 2 Hadoop、Hive、Impala 与 Spark 执行结构图 Fig.2 Structure for implementation of Hadoop, Hive, Impala and Spark 2.3 结构化数据存储与压缩 目前,很多研究者提出了在 Hadoop 中优化结构 化数据存储的方法。 He 等[25]提出的 RCFile 格式旨 在提高数据导入和处理效率。 它首先将数据水平 分割为多个行组(row⁃group),然后对每个组内的数 据垂直分割成列存储。 列存储将数据表同一列的 数据连续存放,当查询只涉及部分列时,可大幅减 少所需读取的数据量。 ORC(optimized RCFile)是对 RCFile 的改进,解决其在数据类型和性能上的多个 局限性, 改 善 查 询 和 空 间 利 用 效 率。 Parquet 是 Hadoop 生态圈中一种新型列式存储格式,灵感来自 于 2010 年 Google 发表的 Dremel 论文[26] ,它可以兼 容 Hadoop 生态圈中大多数生态框架(Hadoop、Spark 等),被多种查询引擎支持(Hive、Impala、Spark SQL、 Drill 等),并且它与语言和平台无关的。 表 1 比较 了本文 2.2 节描述的 3 种查询引擎从 HDFS 上读取 多种格式的数据格式的支持。 Text 是原始的文本数 据,通常为 CSV 或其他特定字符分隔。 Hive 的格式 支持更为全面,由于 Impala 和 Hive 共享 metastore, 因此本文平台实际应用中通常由 Hive 导入数据而 后台使用 Spark SQL 查询。 表 1 Hive、Impala 和 Spark SQL 数据格式支持比较 Table 1 Data format comparison of Hive, Impala and Spark SQL 数据 格式 Hive Impala Spark SQL 查询 插入 查询 插入 查询 插入 Text √ √ √ √ √ √ RCFile √ √ √ — — — ORC √ √ — — — — Parquet √ √ √ √ √ √ ·720· 智 能 系 统 学 报 第 12 卷
第5期 何明,等:基于SQL-on-Hadoop查询引擎的日志挖掘及其应用 ·721. 数据压缩是另一种性能优化方法。压缩一方 DSparkConf conf new SparkConf(); 面节省存储空间,另一方面在相同磁盘/0速度可 ②创建上下文对象; 读写更多记录。Hive、Impala和Spark SQL均支持直 3StreamingContext(conf,Interval); 接查询压缩的数据文件,常用压缩算法有Gzip/ZIib 4Map<E,T>Offsets=kafka.getOffset(); 和侧重于解压缩速度的Snappy。ORC格式本身已 ⑤获取kafka读取偏移量; 内嵌轻量级的压缩机制。 ⑥DStream stream; 2.4结构化数据处理算法 7KafkaUtils.createDStream(input); RDD数据集包含对父RDD的一组依赖,这种依 ⑧Return streamo 赖描述了RDD之间的传承关系。RDD将操作分为 2)RDD数据处理 两类:Transformation与Action。Transformation操作 ①stream.foreachRDD; 不执行运算,只有当Action操作时才触发运算。在 2new VoidFunction<RDD>>(); RDD的实现机制中,基于迭代器的接口实现原理使 3call(RDD<MessageAndMetadata>rdd); 得数据的访问更加高效,同时避免了大量中间结果 4HasOffsetRanges offrange rdd.rdd(); 对内存的消耗。Spark SQL包含了结构化数据和数 ⑤合并请求应答,并解析存储数据: 据之上进行运算的更多信息,Spark SQL使用这些信 6rdd.mapPartitionsToPair; 息进行优化,使得结构化数据的操作更加高效和方 7 new FlumeKafkaFunction(); 便,基于Spark SQL的数据操作流程如下。 8foreachPartition(ProceFunction()); 算法1 SparkSQLonRdd(<input>,<context>:) 9kafka.setOffset(offrange); 输入Kafka输入数据流input,Spark上下文 ①保存kafka读取偏移量。 context; 3)ProceFunction数据后处理 输出分布式集合dataframe。 DIterator<Tuple2<T,KafkaData>>iter; 1)DStream line:Kafka->DStream(input); 2while (iter.hasNext()); 2)获取Kaka流数据输入; 3KafkaData data iter.next()._2(); 3)SglContext sc new SqlContext(context); 4json =data.getData(); 4)DStream<Row>rdd=line.map; 5Record record =Object(json,class); 5)new Function: 6record.setCollect_time; 6)public Row call(T); 7data.getExtData(TIME)); 7)创建Row对象; 8Utils.save(item_topic,record); 8)List <StructField sf new;List <StructField> ⑨Return record.. (): 其中,RDD根据数据记录的key对结构进行分 9)Struct Fields.add CreateDataType (Column 区。分片数据采用迭代器Iterator流式访问,hasNext >)): 方法是由RDD lineage上各个Transformation携带的 10)重复步骤9)创建逻辑表结构: 闭包函数复合而成,使得对象被序列化,通过网络 11)Struct Type st:DataTypes.CreateStructType 传输到其他节点上进行装载运算。Iterator每访问 (sf); 一个元素,就对该元素应用相应的复合函数,得到 12)DataFrame df 的结果再流式地存储。 13)sc->DataFrame(rdd,st); 3平台架构与集群环境部署 14)df.RegisterTable(<Table Name>); 15)DataFrame dataframe=sc.sql(<Sql Query>); 3.1平台架构与处理框架 16)Return dataframe. 本文基于Hadoop,构建证券交易应用服务器和 算法2 RddProcessing(<input>) 网络设备海量日志采集、解析、存储与实时计算分 输入Kafka输入数据流input 析平台,平台的核心架构如下。 输出数据集对象record。 1)数据采集层:负责实时采集来自通达信、恒 1)数据采集与预处理 生、核新的网上交易应用服务器全天24小时的客户
数据压缩是另一种性能优化方法。 压缩一方 面节省存储空间,另一方面在相同磁盘 I/ O 速度可 读写更多记录。 Hive、Impala 和 Spark SQL 均支持直 接查询压缩的数据文件,常用压缩算法有 Gzip / Zlib 和侧重于解压缩速度的 Snappy。 ORC 格式本身已 内嵌轻量级的压缩机制。 2.4 结构化数据处理算法 RDD 数据集包含对父 RDD 的一组依赖,这种依 赖描述了 RDD 之间的传承关系。 RDD 将操作分为 两类:Transformation 与 Action。 Transformation 操作 不执行运算,只有当 Action 操作时才触发运算。 在 RDD 的实现机制中,基于迭代器的接口实现原理使 得数据的访问更加高效,同时避免了大量中间结果 对内存的消耗。 Spark SQL 包含了结构化数据和数 据之上进行运算的更多信息,Spark SQL 使用这些信 息进行优化,使得结构化数据的操作更加高效和方 便,基于 Spark SQL 的数据操作流程如下。 算法 1 SparkSQLonRdd(<input>,<context>) 输入 Kafka 输入数据流 input, Spark 上下文 context; 输出 分布式集合 dataframe。 1)DStream line:Kafka->DStream(input); 2)获取 Kafka 流数据输入; 3)SqlContext sc = new SqlContext(context); 4)DStream<Row> rdd = line.map; 5)new Function; 6)public Row call(T) {}; 7)创建 Row 对象; 8)List < StructField > sf = new;List < StructField > (); 9) Struct Fields. add ( CreateDataType ( < Column >)); 10)重复步骤 9)创建逻辑表结构; 11) Struct Type st: DataTypes. CreateStructType (sf); 12)DataFrame df : 13)sc->DataFrame(rdd, st); 14)df.RegisterTable(<Table Name>); 15)DataFrame dataframe = sc.sql(<Sql Query>); 16)Return dataframe。 算法 2 RddProcessing(<input>) 输入 Kafka 输入数据流 input; 输出 数据集对象 record。 1)数据采集与预处理 ①SparkConf conf = new SparkConf(); ②创建上下文对象; ③StreamingContext(conf, Interval); ④Map<E,T> Offsets = kafka.getOffset(); ⑤获取 kafka 读取偏移量; ⑥DStream stream; ⑦KafkaUtils.createDStream(input); ⑧Return stream。 2)RDD 数据处理 ①stream.foreachRDD; ②new VoidFunction<RDD>>(); ③call(RDD<MessageAndMetadata> rdd); ④HasOffsetRanges offrange = rdd.rdd(); ⑤合并请求应答,并解析存储数据; ⑥rdd.mapPartitionsToPair; ⑦ new FlumeKafkaFunction(); ⑧foreachPartition(ProceFunction()); ⑨kafka.setOffset(offrange); ⑩保存 kafka 读取偏移量。 3)ProceFunction 数据后处理 ①Iterator<Tuple2<T, KafkaData>> iter; ②while (iter.hasNext()); ③KafkaData data = iter.next()._2(); ④json = data.getData(); ⑤Record record =Object(json, class); ⑥record.setCollect_time; ⑦data.getExtData(TIME)); ⑧Utils.save(item_topic, record); ⑨Return record。 其中,RDD 根据数据记录的 key 对结构进行分 区。 分片数据采用迭代器 Iterator 流式访问,hasNext 方法是由 RDD lineage 上各个 Transformation 携带的 闭包函数复合而成,使得对象被序列化,通过网络 传输到其他节点上进行装载运算。 Iterator 每访问 一个元素,就对该元素应用相应的复合函数,得到 的结果再流式地存储。 3 平台架构与集群环境部署 3.1 平台架构与处理框架 本文基于 Hadoop,构建证券交易应用服务器和 网络设备海量日志采集、解析、存储与实时计算分 析平台,平台的核心架构如下。 1)数据采集层:负责实时采集来自通达信、恒 生、核新的网上交易应用服务器全天 24 小时的客户 第 5 期 何明,等:基于 SQL⁃on⁃Hadoop 查询引擎的日志挖掘及其应用 ·721·