编辑推荐
本书作者杨正洪是国内知名大数据专家,是华中科技大学和中国地质大学客座教授,拥有国家专利,是湖北省2013年海外引进的科技人才,受武汉市政府邀请,成立武汉市云升科技发展有限公司,在浙江和上海分别有全资子公司,在美国硅谷设有研发中心。作者在与地方政府、证券金融公司的项目合作中发现,他们对大数据技术很感兴趣,并希望从大数据技术、大数据采集、管理、分析以及可视化等方面得到指导和应用帮助。因此编写了这本大数据技术的快速入门书。本书以Hadoop和Spark框架为线索,比较全面地介绍了Hadoop技术、Spark技术、大数据存储、大数据访问、大数据采集、大数据管理、大数据分析等内容。*后还给出两个案例:环保大数据和公安大数据,供读者参考。
 ;
内容简介
从2015年开始,国内大数据市场继续保持高速的发展态势,作者在与地方政府、证券金融公司的项目合作中发现,他们对大数据技术很感兴趣,并希望从大数据技术、大数据采集、管理、分析以及可视化等方面得到指导和应用帮助。因此编写了这本大数据技术的快速入门书。 本书共12章,以Hadoop和Spark框架为线索,比较全面地介绍了Hadoop技术、Spark技术、大数据存储、大数据访问、大数据采集、大数据管理、大数据分析等内容。最后还给出两个案例:环保大数据和公安大数据,供读者参考。 本书适合大数据技术初学者,政府、金融机构的大数据应用决策和技术人员,IT经理,CTO,CIO等快速学习大数据技术。本书也可以作为高等院校和培训学校相关专业的培训教材。
作者简介
本书作者杨正洪是国内知名大数据专家,毕业于美国State University of New York at Stony Brook,在IBM公司从事大数据相关研发工作12年多。从2003~2013年,杨正洪在美国加州的IBM硅谷实验室(IBM Silicon Valley Lab)负责IBM大数据平台的设计、研发和实施,主持了保险行业、金融行业、政府行业的大数据系统的架构设计和实施。杨正洪是华中科技大学和中国地质大学客座教授,拥有国家专利,是湖北省2013年海外引进人才。受武汉市政府邀请,杨正洪于2012年12月发起成立武汉市云升科技发展有限公司,并获得东湖高新技术开发区办公场所和资金支持。目前公司在浙江和上海分别有全资子公司,在美国硅谷设有研发中心。公司的核心产品是大数据管理平台EasyDoop,并以EasyDoop为基础研发了公安大数据产品和环保大数据产品。这些产品在公安和环保行业得到成功实施,三次被中央电视台新闻联播节目播报,省部长级政府领导亲自考察,并给予了很高的评价。杨正洪参与了多项大数据相关标准的制定工作,曾受邀参与了公安部主导的“信息安全技术-大数据平台安全管理产品安全技术要求”的国家标准制定。
目录
目 ; 录
第1章 ; 大数据时代 1
1.1 ; 什么是大数据 1
1.2 ; 大数据的四大特征 2
1.3 ; 大数据的商用化 3
1.4 ; 大数据分析 5
1.5 ; 大数据与云计算的关系 5
1.6 ; 大数据的国家战略 6
1.6.1 ; 政府大数据的价值 7
1.6.2 ; 政府大数据的应用场景8
1.7 ; 企业如何迎接大数据 8
1.7.1 ; 评估大数据方案的维度9
1.7.2 ; 业务价值维度 10
1.7.3 ; 数据维度 11
1.7.4 ; 现有IT环境和成本维度 12
1.7.5 ; 数据治理维度 13
1.8 ; 大数据产业链分析 14
1.8.1 ; 技术分析 14
1.8.2 ; 角色分析 15
1.8.3 ; 大数据运营 17
1.9 ; 大数据交易 18
1.10 ; 大数据之我见 19
第2章 ; 大数据软件框架 20
2.1 ; Hadoop框架 20
2.1.1 ; HDFS(分布式文件系统) 21
2.1.2 ; MapReduce(分布式计算框架) 22
2.1.3 ; YARN(集群资源管理器) 25
2.1.4 ; Zookeeper(分布式协作服务) 28
2.1.5 ; Ambari(管理工具) 29
2.2 ; Spark(内存计算框架) 29
2.2.1 ; Scala 31
2.2.2 ; Spark SQL 32
2.2.3 ; Spark Streaming 33
2.3 ; 实时流处理框架 34
2.4 ; 框架的选择 35
第3章 ; 安装与配置大数据软件 36
3.1 ; Hadoop发行版 36
3.1.1 ; Cloudera 36
3.1.2 ; HortonWorks 37
3.1.3 ; MapR 38
3.2 ; 安装Hadoop前的准备工作 39
3.2.1 ; Linux主机配置40
3.2.2 ; 配置Java环境 41
3.2.3 ; 安装NTP和python 42
3.2.4 ; 安装和配置openssl43
3.2.5 ; 启动和停止特定服务44
3.2.6 ; 配置SSH无密码访问 44
3.3 ; 安装Ambari 和 HDP 45
3.3.1 ; 配置安装包文件 45
3.3.2 ; 安装 Ambari46
3.3.3 ; 安装和配置HDP47
3.4 ; 初识Hadoop 49
3.4.1 ; 启动和停止服务 50
3.4.2 ; 使用HDFS 51
3.5 ; Hadoop的特性 52
第4章 ; 大数据存储:文件系统 53
4.1 ; HDFS shell命令53
4.2 ; HDFS配置文件 55
4.3 ; HDFS API编程57
4.3.1 ; 读取HDFS文件内容 57
4.3.2 ; 写HDFS文件内容 60
4.4 ; HDFS API总结62
4.4.1 ; Configuration类 62
4.4.2 ; FileSystem抽象类 62
4.4.3 ; Path类 63
4.4.4 ; FSDataInputStream类 63
4.4.5 ; FSDataOutputStream类 63
4.4.6 ; IOUtils类63
4.4.7 ; FileStatus类 64
4.4.8 ; FsShell类64
4.4.9 ; ChecksumFileSystem抽象类 64
4.4.10 ; 其他HDFSAPI实例 64
4.4.11 ; 综合实例 67
4.5 ; HDFS文件格式 69
4.5.1 ; SequenceFile 70
4.5.2 ; TextFile(文本格式) 70
4.5.3 ; RCFile 70
4.5.4 ; Avro 72
第5章 ; 大数据存储:数据库 73
5.1 ; NoSQL 73
5.2 ; HBase管理 74
5.2.1 ; HBase表结构75
5.2.2 ; HBase系统架构78
5.2.3 ; 启动并操作HBase数据库 80
5.2.4 ; HBase Shell工具 82
5.3 ; HBase编程 86
5.3.1 ; 增删改查API 86
5.3.2 ; 过滤器 90
5.3.3 ; 计数器 93
5.3.4 ; 原子操作 94
5.3.5 ; 管理API 94
5.4 ; 其他NoSQL数据库 95
第6章 ; 大数据访问:SQL引擎层 97
6.1 ; Phoenix 97
6.1.1 ; 安装和配置Phoenix98
6.1.2 ; 在eclipse上开发phoenix程序 104
6.1.3 ; Phoenix SQL工具 108
6.1.4 ; Phoenix SQL 语法 109
6.2 ; Hive 111
6.2.1 Hive架构 111
6.2.2 安装Hive 112
6.2.3 Hive和MySQL的配置 114
6.2.4 Hive CLI 115
6.2.5 Hive数据类型115
6.2.6 HiveQL DDL 119
6.2.7 HiveQL DML 121
6.2.8 Hive编程 123
6.2.9 HBase集成125
6.2.10 XML和JSON数据 127
6.2.11 使用Tez 128
6.3 Pig 130
6.3.1 Pig语法 131
6.3.2 Pig和Hive的使用场景比较 134
6.4 ElasticSearch(全文搜索引擎) 136
6.4.1 全文索引的基础知识136
6.4.2 安装和配置ES138
6.4.3 ES API 140
第7章 大数据采集和导入 143
7.1 Flume 145
7.1.1 Flume架构145
7.1.2 Flume事件146
7.1.3 Flume源 147
7.1.4 Flume拦截器(Interceptor) 148
7.1.5 Flume通道选择器(Channel Selector) 149
7.1.6 Flume通道150
7.1.7 Flume接收器151
7.1.8 负载均衡和单点失败153
7.1.9 Flume监控管理153
7.1.10 Flume实例154
7.2 Kafka 155
7.2.1 Kafka架构156
7.2.2 Kafka与JMS的异同 158
7.2.3 Kafka性能考虑158
7.2.4 消息传送机制 159
7.2.5 Kafka和Flume的比较 159
7.3 Sqoop 160
7.3.1 从数据库导入HDFS160
7.3.2 增量导入 163
7.3.3 将数据从Oracle导入Hive 163
7.3.4 将数据从Oracle导入HBase 164
7.3.5 导入所有表 165
7.3.6 从HDFS导出数据 165
7.3.7 数据验证 165
7.3.8 其他Sqoop功能 165
7.4 Storm 167
7.4.1 Storm基本概念168
7.4.2 spout 169
7.4.3 bolt 171
7.4.4 拓扑 173
7.4.5 Storm总结175
7.5 Splunk 175
第8章 大数据管理平台 177
8.1 大数据建设总体架构177
8.2 大数据管理平台的必要性178
8.3 大数据管理平台的功能179
8.3.1 推进数据资源全面整合共享 179
8.3.2 增强数据管理水平180
8.3.3 支撑创新大数据分析180
8.4 数据管理平台(DMP) 180
8.5 EasyDoop案例分析182
8.5.1 大数据建模平台183
8.5.2 大数据交换和共享平台184
8.5.3 大数据云平台 185
8.5.4 大数据服务平台186
8.5.5 EasyDoop平台技术原理分析 188
第9章 Spark技术 192
9.1 Spark框架 192
9.1.1 安装Spark193
9.1.2 配置Spark194
9.2 Spark Shell 195
9.3 Spark编程 198
9.3.1 编写SparkAPI程序 198
9.3.2 使用sbt编译并打成jar包 199
9.3.3 运行程序 200
9.4 RDD 200
9.4.1 RDD算子和RDD依赖关系 201
9.4.2 RDD转换操作203
9.4.3 RDD行动(Action)操作 204
9.4.4 RDD控制操作205
9.4.5 RDD实例 205
9.5 Spark SQL 208
9.5.1 DataFrame 209
9.5.2 RDD转化为DataFrame213
9.5.3 JDBC数据源215
9.5.4 Hive数据源216
9.6 Spark Streaming 217
9.6.1 DStream编程模型 218
9.6.2 DStream操作221
9.6.3 性能考虑 223
9.6.4 容错能力 224
9.7 GraphX图计算框架224
9.7.1 属性图 226
9.7.2 图操作符 228
9.7.3 属性操作 231
9.7.4 结构操作 231
9.7.5 关联(join)操作 233
9.7.6 聚合操作 234
9.7.7 计算度信息 235
9.7.8 缓存操作 236
9.7.9 图算法 236
第10章 大数据分析 238
10.1 数据科学 239
10.1.1 探索性数据分析240
10.1.2 描述统计 241
10.1.3 数据可视化 241
10.2 预测分析 244
10.2.1 预测分析实例244
10.2.2 回归(Regression)分析预测法 246
10.3 机器学习 247
10.3.1 机器学习的市场动态248
10.3.2 机器学习分类249
10.3.3 机器学习算法251
10.4 Spark MLib 252
10.4.1 MLib架构253
10.4.2 MLib算法库253
10.4.3 决策树 257
10.5 深入了解算法 261
10.5.1 分类算法 262
10.5.2 预测算法 263
10.5.3 聚类分析 263
10.5.4 关联分析 264
10.5.5 异常值分析算法266
10.5.6 协同过滤(推荐引擎)算法 267
10.6 Mahout简介267
第11章 案例分析:环保大数据 268
11.1 环保大数据管理平台268
11.2 环保大数据应用平台269
11.2.1 环境自动监测监控服务 270
11.2.2 综合查询服务272
11.2.3 统计分析服务272
11.2.4 GIS服务 274
11.2.5 视频服务 274
11.2.6 预警服务 275
11.2.7 应急服务 276
11.2.8 电子政务服务277
11.2.9 智能化运营管理系统279
11.2.10 环保移动应用系统279
11.2.11 空气质量发布系统280
11.3 环保大数据分析系统280
第12章 案例分析:公安大数据 281
12.1 总体架构设计 281
12.2 建设内容 282
12.3 建设步骤 284
附录 1 数据量的单位级别 285
附录 2 Linux Shell常见命令 286
附录 3 Ganglia(分布式监控系统) 289
附录 4 auth-ssh脚本 290
附录 5 作者简介 292
媒体评论
评论
前沿
前 言
我们生活在大数据时代,正以前所未有的速度和规模产生数据。数据资产正成为和土地、资本、人力并驾齐驱的关键生产要素,并在社会、经济、科学研究等方面颠覆人们探索世界的方法、驱动产业间的融合与分立。
大数据是用来描述数据规模巨大、数据类型复杂的数据集,它本身蕴含着丰富的价值。比如:在金融行业,企业和个人的一些信用记录、消费记录、客户点击数据集、客户刷卡、存取款、电子银行转账、微信评论等行为数据组合为金融大数据,他们利用大数据技术为财富客户推荐产品,利用客户行为数据设计满足客户需求的金融产品,利用金融行业全局数据了解业务运营薄弱点并加快内部数据处理速度,利用决策树技术进入抵押贷款管理,利用数据分析报告实施产业信贷风险控制,利用客户社交行为记录实施信用卡反欺诈,依据客户消费习惯、地理位置、销售时间进行推荐(精准营销)。不仅仅金融行业,政府部门会根据大数据分析结果来做预算,企业也会根据大数据来进行市场策略调整。
Gartner指出,64%的受访企业表示他们正在或是即将进行大数据工作,然而其中一些企业却并不知道他们能够使用大数据做些什么。这正好印证了大数据领域的最主要的两个挑战:如何从大数据中获取价值以及如何定义大数据战略。这是本书首先需要解释的内容。
谷歌、Amazon、Facebook等全球知名互联网企业作为大数据领域的先驱者,凭借自身力量进行大数据探索,甚至在必要时创造出相关工具。这些工具目前已经被视为大数据技术的基础,其中最知名的当数MapReduce与Hadoop。Hadoop是目前处理大规模结构化与非结构数据的首选平台,它提供了分布式处理框架与开发环境。MapReduce是一种计算框架,它实现了将大型数据处理任务分解成很多单个的、可以在服务器集群中并行执行的任务,这些任务的计算结果可以合并在一起来计算最终的结果。在Hadoop问世以来的十年间,新的组件(如:Spark)层出不穷,极大地扩张了整个Hadoop生态圈。
大数据技术有别于传统数据处理工具和技术,而且大数据技术很难掌握,一般需要1-2年的反复尝试,在实际使用中解决了大量问题之后才能正确理解它。我们编写这本书的目的是,以硅谷大数据实战为基础,让读者略过那些不重要的大数据的细枝末节,通过实际的案例,帮助读者快速掌握大数据技术领域最能商用的大数据工具和软件平台,从而帮助读者轻松实施大数据方案。在本书中,我们将阐述如下最为硅谷所熟知的大数据相关技术:
l 框架:Hadoop、Spark。
l 集群管理:MapReduce、Yarn、Mesos。
l 开发语言:Java、Python、Scala、Pig、Hive、Spark SQL。
l 数据库:NoSQL、HBase、Cassandra、Impala。
l 文件系统:HDFS、Ceph。
l 搜索系统:Elastic Search。
l 采集系统:Flume、Sqoop、Kafka。
l 流式处理:Spark Streaming、Storm。
l 发行版:HortonWorks、Cloudera、MapR。
l 管理系统:Ambari、大数据管理平台。
l 机器学习:Spark MLlib、Mahout。
上面的列表也说明了,Hadoop生态圈有几十个软件组成。这些软件提供了什么功能?到底在什么情况下使用什么软件?软件之间怎么组合使用?这些问题正是本书想要回答的。本书与市场上其他大数据书籍的区别是,我们不是专注某一个软件(比如:Spark),而是阐述整个生态圈中的主流软件,通过实例让你理解这些软件是什么,在什么场合使用,相互的区别是什么。如果我们把这几十个软件比喻成几十种厨房工具,那就是让你避免拿着菜刀去削苹果,或者拿着水果刀去剁肉。
除了阐述大数据的定义、前景和各类Hadoop发行版之外,本书主要是按照大数据处理的几个大步骤来组织内容的。
(1)大数据存储:探究HDFS和HBase作为大数据存储方式的优劣。
(2)大数据访问:探究SQL引擎层中Hive、Phoenix、SparkSQL等组件的功能,并阐述了全文搜索的ElasticSearch,也探究了Spark的高速访问能力。
(3)大数据采集:大数据的采集是指接收各类数据源(比如:Web、行业应用系统或者传感器等)的数据。大数据采集的主要特点和挑战是导入的数据量大(每秒钟的导入量经常会达到百兆,甚至千兆级别)、并发数高和数据源的异构。采集端可能会有很多数据库(或文件),有时需要在导入基础上做一些简单的清洗和预处理工作。在这个部分,我们探究了Flume、Kafka、Sqoop等技术,也探究了如何使用Storm和Spark Streaming来对数据进行流式计算,来满足部分业务的实时和准实时计算需求。
(4)大数据管理:探究数据模型、安全控制、数据生命周期等数据管理内容。
(5)大数据的统计和分析:探究了如何利用分布式计算集群来对存储于其内的海量数据进行统计分析,重点探究了机器学习和Spark MLlib,也阐述了多种分析算法。
参加本书编写的同志还有:余飞、邵敏华、欧阳涛、杨正礼、王娜、李祥、刘毕操、彭勃、李招、张剑、杨磊等人。由于我们水平有限,书中难免存在纰漏之处,敬请读者批评指正。杨正洪的邮件地址为yangzhenghong@yahoo.com。
杨正洪
2016年5月于 San Jose
免费在线读
第 9 章 Spark技术
Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark最大的特点就是快(Lightning-Fast),可比 Hadoop MapReduce 的处理速度快 100 倍。此外,Spark 提供了简单易用的 API,几行代码就能实现 WordCount。本章介绍Spark 的框架,Spark Shell 、RDD、Spark SQL、SparkStreaming 等的基本使用。
9.1 Spark框架
Spark作为新一代大数据快速处理平台,集成了大数据相关的各种能力。Hadoop的中间数据需要存储在硬盘上,这产生了较高的延迟。而Spark基于内存计算,解决了这个延迟的速度问题。Spark本身可以直接读写Hadoop上任何格式数据,这使得批处理更加快速。
图9-1是以Spark为核心的大数据处理框架。最底层为大数据存储系统,如:HDFS、HBase等。在存储系统上面是Spark集群模式(也可以认为是资源管理层),这包括Spark自带的独立部署模式、YARN和Mesos集群资源管理模式,也可以是Amazon EC2。Spark内核之上是为应用提供各类服务的组件。Spark内核API支持Java、Python、Scala等编程语言。SparkStreaming提供高可靠性、高吞吐量的实时流式处理服务,能够满足实时系统要求;MLib提供机器学习服务,Spark SQL提供了性能比Hive快了很多倍的SQL查询服务,GraphX提供图计算服务。
图9-1 Spark 框架
从上图看出,Spark有效集成了Hadoop组件,可以基于Hadoop YARN作为资源管理框架,并从HDFS和HBase数据源上读取数据。YARN是Spark目前主要使用的资源管理器。Hadoop能做的,Spark基本都能做,而且做的比Hadoop好。Spark依然是Hadoop生态圈的一员,它替换的主要是MR的计算模型而已。资源调度依赖于YARN,存储则依赖于HDFS。
Spark的大数据处理平台是建立在统一抽象的RDD之上。RDD是弹性分布式数据集(Resilient Distributed Dataset)的英文简称,它是一种特殊数据集合,支持多种来源,有容错机制,可以被缓存,支持并行操作。Spark的一切都是基于RDD的。RDD就是Spark输入的数据。
Spark应用程序在集群上以独立进程集合的形式运行。如图9-2所示,主程序(叫做Driver程序)中的SparkContext对象协调Spark应用程序。SparkContext对象首先连接到多种集群管理器(如:YARN),然后在集群节点上获得Executor。SparkContext把应用代码发给Executor,Executor负责应用程序的计算和数据存储。
图9-2 集群模式
每个应用程序都拥有自己的Executor。Executor为应用程序提供了一个隔离的运行环境,以Task的形式执行作业。对于Spark Shell来说,这个Driver就是与用户交互的进程。
9.1.1 安装Spark
最新的Spark版本是1.6.1。它可以运行在Windows或Linux机器上。运行 Spark需要 Java JDK 1.7,CentOS 6.x 系统默认只安装了 Java JRE,还需要安装 Java JDK,并确保配置好 JAVA_HOME、PATH和CLASSPATH变量。此外,Spark 会用到 HDFS 与YARN,因此读者要先安装好 Hadoop。我们可以从Spark官方网站http://spark.apache.org/downloads.html上下载Spark,如图9-3所示。
图9-3 下载安装包
有几种Package type,分别为:
l Source code:Spark 源码,需要编译才能使用。
l Pre-build with user-provided Hadoop:“Hadoop free”版,可应用到任意 Hadoop 版本。
l Pre-build for Hadoop 2.6 and later:基于 Hadoop 2.6 的预编译版,需要与本机安装的 Hadoop 版本对应。可选的还有 Hadoop 2.4 and later、Hadoop 2.3、Hadoop 1.x,以及 CDH 4。
本书选择的是 Pre-build with user-provided Hadoop,简单配置后可应用到任意 Hadoop 版本。下载后,执行如下命令进行安装:
sudo tar -zxf spark-1.6.1-bin-without-hadoop.tgz -C /usr/local/
cd /usr/local
sudo mv ./spark-1.6.1-bin-without-hadoop/ ./spark
sudo chown -R hadoop:hadoop ./spark
9.1.2 配置Spark
安装后,进入conf目录,以spark-env.sh.template文件为模块创建spark-env.sh文件,然后修改其配置信息,命令如下:
cd /usr/local/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
编辑 ./conf/spark-env.sh(vim./conf/spark-env.sh),在文件的最后加上如下一行:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
保存后,Spark 就可以启动和运行了。在./examples/src/main 目录下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等语言的版本。我们可以先运行一个示例程序 SparkPi(即计算 π 的近似值),执行如下命令:
cd /usr/local/spark
./bin/run-example SparkPi
执行时会输出非常多的运行信息,输出结果不容易找到,可以通过 grep 命令进行过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中):
./bin/run-example SparkPi 2>&1 | grep "Pi is roughly"
过滤后的运行结果为 π 的 5 位小数近似值 。
9.2 Spark Shell
以前的统计和机器学习依赖于数据抽样。从统计的角度来看,抽样如果足够随机,其实可以很精准地反应全集的结果,但事实上往往很难做到随机,所以通常做出来也会不准。现在大数据解决了这个问题,它不是通过优化抽样的随机来解决,而是通过全量数据来解决。要解决全量的数据就需要有强大的处理能力,Spark首先具备强大的处理能力,其次Spark Shell带来了即席查询。做算法的工程师,以前经常是在小数据集上跑个单机,然后看效果不错,一到全量上,就可能和单机效果很不一样。有了Spark后就不一样了,尤其是有了Spark Shell。可以边写代码,边运行,边看结果。Spark提供了很多的算法,最常用的是贝叶斯、word2vec、线性回归等。作为算法工程师,或者大数据分析师,一定要学会用Spark Shell。
Spark Shell 提供了简单的方式来学习 Spark API,也提供了交互的方式来分析数据。Spark Shell 支持 Scala 和 Python,本书选择使用 Scala 来进行介绍。Scala集成了面向对象和函数语言的特性,并运行于Java 虚拟机之上,兼容现有的 Java 程序。Scala 是Spark 的主要编程语言,如果仅仅是写 Spark 应用,并非一定要用 Scala,用Java和Python都是可以的。使用 Scala 的优势是开发效率更高,代码更精简,并且可以通过 Spark Shell 进行交互式实时查询,方便排查问题。执行如下命令启动 Spark Shell:
./bin/spark-shell
启动成功后会有“scala >”的命令提示符。这表明已经成功启动了Spark Shell。在 Spark Shell 启动时,输出日志的最后有这么几条信息:
16/04/16 17:25:47 INFO repl.SparkILoop: Created spark context..
Spark context available as sc.
这些信息表明 SparkContext已经初始化好了,可通过对应的sc变量直接进行访问。Spark 的主要抽象是分布式的数据集合RDD,它可被分发到集群各个节点上,进行并行操作。一个RDD可以通过 Hadoop InputFormats 创建(如 HDFS),或者从其他 RDDs转化而来。下面我们从 ./README 文件新建一个 RDD,代码如下:
scala>val textFile =sc.textFile("file:///usr/local/spark/README.md")
上述的sc是Spark创建的SparkContext,我们使用SparkContext对象加载本地文件README.md来创建RDD。输出结果如下:
textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at
:27
上述返回结果为一个MapPartitionsRDD文件。需要说明的是,加载HDFS文件和本地文件都是使用textFile ,区别在于前缀“hdfs://”为HDFS文件,而“file://”为本地文件。上述代码中通过“file://”前缀指定读取本地文件,直接返回MapPartitionsRDD。Spark Shell默认方式是读取HDFS中的文件。从HDFS读取的文件先转换为HadoopRDD,然后隐式转换成MapPartitionsRDD。
上面的例子使用Spark中的文本文件README.md创建一个RDD textFile,文件中包含了若干文本行。将该文本文件读入RDDtextFile时,其中的文本行将被分区,以便能够分发到集群中并行化操作。我们可以想象,RDD有多个分区,每个分区上有多行的文本内容。RDDs 支持两种类型的操作:
l actions:在数据集上运行计算后返回结果值。
l transformations:转换。从现有RDD创建一个新的RDD。
下面我们演示count()和first()操作:
scala>textFile.count() // RDD 中的 item 数量,对于文本文件,就是总行数
输出结果为:
res0: Long = 95
scala>textFile.first() // RDD 中的第一个 item,对于文本文件,就是第一行内容
输出结果为:
res1: String = # Apache Spark
上面这两个例子都是action的例子。接着演示transformation,通过 filter transformation来筛选出包含 Spark 的行,返回一个新的RDD,代码如下:
scala>val linesWithSpark = textFile.filter(line =>line.contains("Spark"))
scala>linesWithSpark.count() // 统计行数
上面的linesWithSpark RDD有多个分区,每个分区上只有包含了Spark的若干文本行。输出结果为:
res4: Long = 17
上述结果表明一共有17行内容包含“Spark”,这与通过 Linux 命令 cat ./README.md | grep"Spark" -c 得到的结果一致,说明是正确的。action 和 transformation 可以用链式操作的方式结合使用,使代码更为简洁:
scala>textFile.filter(line =>line.contains("Spark")).count() // 统计包含 Spark 的行数
RDD的actions和transformations可用在更复杂的计算中。例如,通过如下代码可以找到包含单词最多的那一行内容共有几个单词:
scala>textFile.map(line => line.split(" ").size).reduce((a, b)=> if (a > b) a else b)
输出结果为:
res5: Int = 14
上述代码将每一行文本内容使用split进行分词,并统计分词后的单词数。将每一行内容map为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行reduce操作,找到最大的数。map()、reduce()中的参数是Scala的函数字面量(function literals),并且可以使用Scala/Java的库。例如,通过使用 Math.max() 函数(需要导入Java的Math库),可以使上述代码更容易理解:
scala>import java.lang.Math
scala>textFile.map(line => line.split(" ").size).reduce((a, b)=> Math.max(a, b))
词频统计(WordCount)是HadoopMapReduce的入门程序,Spark可以更容易地实现。首先结合flatMap、map和reduceKey来计算文件中每个单词的词频:
scala>val wordCounts = textFile.flatMap(line => line.split("")).map(word => (word, 1)).reduceByKey((a, b) => a b)
输出结果为(string,int)类型的键值对ShuffledRDD。这是因为reduceByKey操作需要进行Shuffle操作,返回的是一个Shuffle形式的ShuffleRDD:
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] atreduceByKey at
:29
然后使用collect聚合单词计算结果:
scala>wordCounts.collect()
输出结果为:
res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1),(processing,1), (Because,1), (The,1)...)
Spark 支持将数据缓存在集群的内存缓存中,当数据需要反复访问时这个特征非常有用。调用 cache(),就可以将数据集进行缓存:
scala>textFilter.cache()
9.3 Spark编程
无论Windows或Linux操作系统,都是基于Eclipse或Idea构建开发环境,通过Java、Scala或Python语言进行开发。根据开发语言的不同,我们需要预先准备好JDK、Scala或Python环境,然后在Eclipse中下载安装Scala或Python插件。
下面我们通过一个简单的应用程序 SimpleApp 来演示如何通过 Spark API 编写一个独立应用程序。不同于使用Spark Shell自动初始化的SparkContext,独立应用程序需要自己初始化一个SparkContext,将一个包含应用程序信息的SparkConf对象传递给SparkContext构造函数。对于独立应用程序,使用 Scala 编写的程序需要使用 sbt 进行编译打包,相应地,Java 程序使用 Maven 编译打包,而 Python 程序通过 spark-submit 直接提交。
在终端中执行如下命令,创建一个文件夹 sparkapp 作为应用程序根目录:
cd ~ # 进入用户主文件夹
mkdir ./sparkapp # 创建应用程序根目录
mkdir -p ./sparkapp/src/main/scala # 创建所需的文件夹结构
9.3.1 编写SparkAPI程序
在./sparkapp/src/main/scala下建立一个名为SimpleApp.scala 的文件(vim./sparkapp/src/main/scala/SimpleApp.scala),添加代码如下:
/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SimpleApp {
//使用关键字def声明函数,必须为函数指定参数类型
def main(args: Array[String]) {
val logFile ="file:///usr/local/spark/README.md" // 一个本地文件
//创建SparkConf对象,该对象包含应用程序的信息
val conf = newSparkConf().setAppName("Simple Application")
//创建SparkContext对象,该对象可以访问Spark集群
val sc = new SparkContext(conf)
val logData = sc.textFile(logFile,2).cache()
//line=>line.contains(..)是匿名函数的定义,line是参数
val numAs = logData.filter(line =>line.contains("a")).count()
val numBs = logData.filter(line =>line.contains("b")).count()
println("Lines with a: %s, Lineswith b: %s".format(numAs, numBs))
}
}
上述程序计算 /usr/local/spark/README 文件中包含 “a” 的行数和包含 “b” 的行数。不同于 Spark Shell,独立应用程序需要通过“val sc = newSparkContext(conf)”初始化 SparkContext,SparkContext 的参数 SparkConf 包含了应用程序的信息。
9.3.2 使用sbt编译并打成jar包
该程序依赖 Spark API,因此我们需要通过sbt(或mvn)进行编译打包。我们以sbt为例,创建一个包含应用程序代码的jar包。在 ./sparkapp 中新建文件 simple.sbt(vim ./sparkapp/simple.sbt),添加如下内容,声明该独立应用程序的信息以及与 Spark 的依赖关系:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies = "org.apache.spark" %% "spark-core" %"1.6.1"
文件 simple.sbt 需要指明Spark和Scala的版本。上述版本信息可以从Spark Shell获得。我们启动Spark Shell的过程中,当输出到 Spark 的符号图形时,可以看到相关的版本信息。
Spark中没有自带sbt,需要手动安装sbt,我们选择安装在/usr/local/sbt中:
sudo mkdir /usr/local/sbt
sudo chown -R hadoop /usr/local/sbt # 此处的hadoop为你的用户名
cd /usr/local/sbt
下载sbt后,拷贝至 /usr/local/sbt 中。接着在 /usr/local/sbt 中创建 sbt 脚本(vim ./sbt),添加如下内容:
#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX: CMSClassUnloadingEnabled-XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"
保存后,为 ./sbt 脚本增加可执行权限:
chmod u x ./sbt
最后检验 sbt 是否可用:
./sbt sbt-version
只要能得到版本信息就说明sbt安装没问题了。接着,我们可以通过如下代码将整个应用程序打包成 JAR文件:
/usr/local/sbt/sbt package
打包成功的话,会输出“Done Packaging”信息。生成的 jar 包的位置为~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar。
9.3.3 运行程序
一旦应用程序被打包成jar文件,就可以通过/bin/spark-submit脚本启动应用程序。将生成的jar包通过spark-submit提交到 Spark 中运行了,命令如下:
/usr/local/spark/bin/spark-submit --class "SimpleApp"
~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar
如果你觉得输出信息太多,可以通过如下命令过滤结果信息:
/usr/local/spark/bin/spark-submit --class "SimpleApp"
~/sparkapp/target/scala-2.10/simple-project_2.10-1.0.jar 2>&1 | grep"Lines with a:"
最终得到的结果如下:
Lines with a: 58, Lines with b: 26
大数据技术入门 pdf下载声明
本pdf资料下载仅供个人学习和研究使用,不能用于商业用途,请在下载后24小时内删除。如果喜欢,请购买正版