欢迎光临
免费的PDF电子书下载网站

大数据实时计算与应用 PDF下载

编辑推荐

本书综合了大数据离线分析所需的主流技术Storm Apache HBase Zookeeper Kafka,并配以案例和丰富的辅助学习资源,足以满足广大学习者入门的需要。 ;

内容简介

本书定位于大数据专业核心技术——实时计算,重点讨论大数据应用场景中的数据特点和应用需求的实时流计算技术。 本书通过对分布式实时计算系统的分析,将学习部分按功能性质划分成四个模块,分别为Kafka数据流处理模块、Strom实时计算模块、HBase数据存储模块和Zookeeper分布式协调模块。对此四个工作模块进行教学化处理,形成HBase基础操作、Zookeeper集群管理、配置Storm集群等核心课程体系,并配以实例使学习者便于理解,易于上手,掌握实时计算Storm相关的基础知识和实际业务系统的开发能力。 本书主要针对具有一定软件编程基础(特别是数据技术)的学生和专业工程师,特别是数据科学、数据分析专业的高年级本科学生以及从事与数据相关的高级技术人员的读者人群。

作者简介

暂无

大数据实时计算与应用 PDF下载

目录

目录

 ;

第1章分布式实时计算系统

 ;

1.1分布式的概念

 ;

1.1.1分布式系统

 ;

1.1.2分布式计算

 ;

1.2分布式通信

 ;

1.2.1分布式通信基础

 ;

1.2.2消息队列

 ;

1.2.3Storm计算模型

 ;

1.3分布式实时计算系统架构

 ;

1.3.1数据获取——Kafka

 ;

1.3.2数据处理——Storm

 ;

1.3.3数据存储——HBase

 ;

1.4系统架构

 ;

本章小结

 ;

习题

 ;

第2章初识Kafka

 ;

2.1什么是Kafka

 ;

2.1.1Kafka概述

 ;

2.1.2使用场景

 ;

2.1.3Kafka基本特性

 ;

2.1.4性能

 ;

2.1.5总结

 ;

2.1.6Kafka在LinkedIn中的应用

 ;

2.2Topics和logs

 ;

2.3分布式——consumers和producers

 ;

本章小结

 ;

习题

 ;

第3章Kafka环境搭建

 ;

3.1服务器搭建

 ;

3.2开发环境搭建

 ;

本章小结

 ;

习题

 ;

第4章Kafka消息传送

 ;

4.1消息传输的事务定义

 ;

4.2性能优化

 ;

4.2.1消息集

 ;

4.2.2数据压缩

 ;

4.3生产者和消费者

 ;

4.3.1Kafka生产者的消息发送

 ;

4.3.2Kafka consumer

 ;

4.4主从同步

 ;

4.5客户端API

 ;

4.5.1Kafka producer API

 ;

4.5.2Kafka consumer API

 ;

4.6消息和日志

 ;

本章小结

 ;

习题

 ;

 ;

 ;

 ;

 ;

 ;

第5章Zookeeper开发

 ;

5.1Zookeeper的来源

 ;

5.2Zookeeper基础

 ;

5.2.1基本概念

 ;

5.2.2Zookeeper架构

 ;

5.3Zookeeper的API

 ;

5.3.1建立会话

 ;

5.3.2管理权

 ;

5.3.3节点注册

 ;

5.3.4任务队列化

 ;

5.4状态变化处理

 ;

5.5故障处理

 ;

5.6Zookeeper集群管理

 ;

5.6.1集群配置

 ;

5.6.2集群管理

 ;

本章小结

 ;

习题

 ;

第6章初识HBase

 ;

6.1什么是HBase

 ;

6.1.1大数据的背景

 ;

6.1.2HBase架构

 ;

6.1.3HBase存储API

 ;

6.2HBase部署

 ;

6.2.1HBase配置及安装

 ;

6.2.2运行模式

 ;

6.2.3集群操作

 ;

本章小结

 ;

习题

 ;

第7章HBase基础操作

 ;

7.1CRUD操作

 ;

7.1.1Put操作

 ;

7.1.2Get操作

 ;

7.1.3Delete操作

 ;

7.2批处理操作

 ;

7.3行锁

 ;

7.4扫描

 ;

7.5其他操作

 ;

7.5.1HTable方法

 ;

7.5.2Bytes方法

 ;

本章小结

 ;

习题

 ;

第8章HBase高阶特性

 ;

8.1过滤器

 ;

8.1.1什么是过滤器

 ;

8.1.2比较过滤器

 ;

8.1.3专用过滤器

 ;

8.1.4附加过滤器

 

8.2计数器

 

8.2.1什么是计数器

 

8.2.2单计数器及多计数器

 

8.3协处理器

 

8.3.1什么是协处理器

 

8.3.2协处理器API应用

 

本章小结

 

习题

 

第9章管理HBase

 

9.1HBase数据描述

 

9.1.1表

 

9.1.2列簇

 

9.1.3属性

 

9.2表管理API

 

9.2.1基础操作

 

9.2.2集群管理

 

本章小结

 

习题

 

第10章初识Storm

 

10.1什么是Storm

 

10.1.1Storm能做什么

 

10.1.2Storm的特性

 

10.1.3Storm分布式计算结构

 

10.2构建topology

 

10.2.1Storm的基本概念

 

10.2.2构建topology

 

10.2.3示例: 单词计数

 

10.3Storm并发机制

 

10.3.1topology并发机制

 

10.3.2给topology增加Worker

 

10.3.3配置Executor和task

 

10.4数据流分组的理解

 

10.5消息的可靠处理

 

10.5.1消息被处理后会发生什么

 

10.5.2Storm可靠性的实现方法

 

10.5.3调整可靠性

 

本章小结

 

习题

 

第11章配置Storm集群

 

11.1Storm集群框架介绍

 

11.1.1理解nimbus守护进程

 

11.1.2supervisor守护进程的工作方式

 

11.1.3DRPC服务工作机制

 

11.1.4Storm的UI简介

 

11.2在Linux上安装Storm

 

11.2.1搭建Zookeeper集群

 

11.2.2安装Storm依赖库

 

11.2.3下载并解压Storm发布版本

 

11.2.4修改storm.yaml配置文件

 

11.2.5启动Storm后台进程

 

11.3将topology提交到集群上

 

本章小结

 

习题

 

第12章Trident和TridentML

 

12.1Trident topology

 

12.1.1Trident综述

 

12.1.2Reach

 

12.1.3字段和元组

 

12.1.4状态

 

12.1.5Trident topology的执行

 

12.2Trident接口

 

12.2.1综述

 

12.2.2本地分区操作

 

12.2.3重新分区操作

 

12.2.4群聚操作

 

12.2.5流分组操作

 

12.2.6合并和连接

 

12.3Trident状态

 

12.3.1事务spouts

 

12.3.2透明事务spouts

 

12.3.3非事务spouts

 

12.3.4Spout和State总结

 

12.3.5State应用接口

 

12.3.6MapState的更新

 

12.3.7执行MapState

 

12.4TridentML: 基于storm的实时在线机器学习库

 

本章小结

 

习题

 

第13章DRPC模式

 

13.1DRPC概述

 

13.2DRPC自动化组件

 

13.3本地模式DRPC

 

13.4远程模式DRPC

 

13.5一个更复杂的例子

 

本章小结

 

习题

 

第14章Storm实战

 

14.1网站页面浏览量计算

 

14.1.1背景介绍

 

14.1.2体系结构

 

14.1.3项目相关介绍

 

14.1.4Storm编码实现

 

14.1.5运行topology

 

14.2网站用户访问量计算

 

14.2.1背景介绍

 

14.2.2Storm代码实现

 

14.2.3运行topology

 

本章小结

 

习题

 

参考文献

 

 

前沿

前言

为什么要写这本书由于目前对信息高时效性、可操作性需求的不断增长,软件系统需要在更少的时间内处理更多的数据。随着可连接设备的不断增加以及在各行各业的广泛应用,这种需求已经无处不在。传统企业的运营系统被迫需要处理原先只有在互联网公司才会遇到的海量数据。这种转变正在不断改变传统的架构和解决方案,将在线事务处理和离线分析分隔开。与此同时,人们正在重新思考从数据中提取信息的意义和价值。计算系统的框架和基础设施也在逐步进化,以适应这种新场景。具体来说,数据的生成可以看作是一连串发生的离散事件,这些事件会伴随着不同的数据流、操作和分析,最后交由一个通用的实时计算处理系统进行处理。一个成熟的实时计算处理框架主要包括四个模块: 数据获取模块、数据传输模块、数据存储模块和数据处理模块。作为现在流行的实时计算处理框架,Storm提供了可容错分布式计算所需的基本原语和保障机制,可以满足大容量的关键业务应用的需求。它不但是一套技术的整合,也是一种数据流和控制的机制。很多大公司都将Storm作为大数据处理平台的核心部分。同样,由于通用关系型数据库在数据剧增时会出现系统扩展性和延迟的问题,因此业界出现了一类面向半结构化数据存储和处理的高可扩展、低写入/查询延迟的系统,例如键值存储系统、文档存储系统和类BigTable存储系统等,这些系统统称为NoSQL系统。Apache HBase就是其中已迈向实用的成熟系统,并已成功应用于互联网服务领域和传统行业的众多在线式数据分析处理系统中。然而,分布式的构建并不容易。人们日常使用的应用大多基于分布式系统,在短时间内分布式系统的现状并不会改变。Apache Zookeeper旨在减轻构建健壮的分布式系统的任务。Zookeeper基于分布式计算的核心概念而设计,主要给开发人员提供一套容易理解和开发的接口,从而简化分布式系统构建的任务。近年来,活动和运营数据处理已经成为网站软件产品特性中一个至关重要的组成部分,需要一个更加复杂的基础设施对其提供支持。Kafka作为一个分布式的消息系统,以可水平扩展和高吞吐率而被广泛使用,Kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模网站中的所有动作流数据,即通过集群机来提供实时的消费。本书对实时计算系统进行了全面的介绍,章节组织由浅入深,内容阐述细致入微且贴近实际,可以作为参考书以方便读者在开发过程中随时查阅。我相信,本书对实时计算系统的使用者和开发者来说都是及时和不可或缺的。读者对象本书适合以下读者阅读。(1) 大数据技术的学习者和爱好者。(2) 有Java基础的开发者。(3) 大数据实时计算技术开发者。(4) 实时计算集群维护者。(5) 分布式实时计算系统相关维护人员。如何阅读本书本书共分为五个部分。第一部分为简介。简介部分为第1章,主要介绍了分布式实时计算系统的相关知识,从分布式的基本概念到分布式通信的原理,最后引出分布式实时计算架构的四个模块——Kafka、Storm、Zookeeper和HBase。

第二部分为数据获取模块Kafka的相关介绍,包括第2章~第4章。本部分介绍了Kafka的相关基础知识和应用知识,让读者了解Kafka的结构、环境搭建方式以及消息传输方式等。本部分首先介绍了Kafka的基本概念,引出了Kafka的基本特性以及Kafka分布式系统架构中关于生产者和消费者的介绍。随后介绍了Kafka的环境搭建方法,最后介绍了Kafka消息传送方面的知识,包括性能优化、主从同步以及客户端API等信息,同时解释了消息和日志方面的相关概念。第三部分为数据调度模块Zookeeper的相关介绍,包括第5章。本部分讲解了Zookeeper相关的基础知识和开发知识,让读者了解Zookeeper的来源、性质及基本概念、Zookeeper开发的应用方法及实现方式、Zookeeper集群的配置及管理方法等。本部分首先介绍了分布式协作存在的三大难点,引出了FLP定律和CAP定律。接着从Zookeeper的Znode类型、通知机制、Lead选择方法等方面介绍Zookeeper的基本概念。随后介绍了Zookeeper的两种运行模式、架构及其应用场景,并详细介绍了Zookeeper可调用的多种API用法,包含会话建立、管理权获取、节点注册、任务队列化等。最后介绍了Zookeeper集群管理的需求及方法,同时解释了动态选举的过程。第四部分为数据存储模块HBase的相关介绍,包括第6章~第9章。本部分首先介绍了HBase的架构以及存储API,然后介绍了HBase的基础操作,包括put、get、delete操作,批处理操作以及HTable、Bytes等其他操作。随后介绍了HBase的高阶特性,包括过滤器、计数器、协处理器等。最后介绍了HBase管理部分的内容,包括HBase的数据描述方式以及表管理API等。第五部分为数据处理模块Storm的相关介绍,包括第10章~第14章。本部分首先对Storm的基本概念进行介绍,包括Storm的基本特性、topology的构建方式、Storm的并发机制以及数据流分组等相关知识。随后介绍了在Linux上配置Storm集群的相关方法以及如何将topology提交到Storm集群上运行。从Trident的topology、接口、状态等方面介绍了trident的相关知识,同时介绍了一种基于Storm的实时在线机器学习库——TridentML,从各个组件对DRPC进行介绍。最后通过两个具体的Storm项目实例让读者对Storm有更深刻的理解。编者2018年5月

免费在线读

第3章

Kafka环境搭建

3.1服务器搭建Kafka服务器的搭建可按以下步骤进行。(1) 下载Kafka。下载最新的版本并解压。
>tar -xzf kafka_2.9.2-0.8.1.1.tgz
>cd kafka_2.9.2-0.8.1.1
(2) 启动服务。Kafka用到了Zookeeper,所以首先应启动Zookeeper。下面启用一个单实例的Zookeeper服务。可以在命令的结尾加&符号,这样就可以启动服务后离开控制台。
>bin/zookeeper-server-start.sh config/zookeeper.properties &
[2013-04-22 15:01:37,495] INFO Reading configuration from:config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)

现在启动Kafka: 
>bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)

(3) 创建Topic。创建一个叫作test的Topic,它只有一个分区,一个副本。
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

可以通过list命令查看创建的Topic。
>bin/kafka-topics.sh --list --zookeeper localhost:2181
test

除了手动创建Topic,还可以配置broker来自动创建Topic。(4) 发送消息。Kafka使用一个简单的命令行producer,从文件或者标准输入中读取消息并发送到服务端。默认每条命令将发送一条消息。运行producer,并在控制台输入一些消息,这些消息将被发送到服务端。
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message This is another message
按Ctrl C组合键可以退出发送。
(5) 启动consumer。Kafka有一个命令行consumer,可以读取消息并输出到标准输出。
>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message
在一个终端运行consumer命令行,在另一个终端运行producer命令行,就可以在一个终端输入消息,在另一个终端读取消息。

大数据
实时计算与应用

0

第3章

Kafka环境搭建

0

这两个命令都有自己的可选参数,可以在运行时不加任何参数就看到帮助信息。(6) 搭建一个具有多个broker的集群。刚才只是启动了单个broker,现在启动由3个broker组成的集群,这些broker节点都是在本机上。首先为每个节点编写配置文件。
>cp config/server.properties config/server-1.properties
>cp config/server.properties config/server-2.properties

在复制出的新文件中添加以下参数。
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
broker.id在集群中唯一标注一个节点,因为在同一个机器上,所以必须制定不同的端口和日志文件,避免数据被覆盖。刚才已经启动Zookeeper和一个节点,现在启动另外两个节点。
>bin/kafka-server-start.sh config/server-1.properties &

>bin/kafka-server-start.sh config/server-2.properties &

创建一个拥有3个副本的Topic。
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

现在已经搭建了一个集群,怎么知道每个节点的信息呢?运行describe topics命令就可以了。
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:myreplicatedtopic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0

其中,第一行是对所有分区的一个描述,且每个分区都会对应一行,因为只有一个分区,所以下面只加了一行。Leader: 负责处理消息的读和写,Leader是从所有节点随机选择的。Replicas: 列出了所有的副本节点,不管节点是否在服务中。Isr: 是正在服务中的节点。在本例中,节点1是作为Leader运行。向Topic发送消息。
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

my test message 1my test message 2^C

消费这些消息: 
>bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

my test message 1
my test message 2

3.2开发环境搭建搭建好了Kafka的服务器,可以使用Kafka的命令行工具创建Topic,发送和接收消息。下面介绍搭建Kafka的开发环境。1. 添加依赖搭建开发环境需要引入Kafka的jar包,一种方式是将Kafka安装包中lib目录下的jar包加入项目的classpath中; 另一种方式是使用maven管理jar包依赖。创建好maven项目后,在pom.xml中添加以下依赖。


 

org.apache.kafka


 

kafka_2.10


 

0.8.0


添加依赖后会发现有两个jar包的依赖找不到。下载这两个jar包,解压后有两种选择: 第一种是使用mvn的install命令将jar包安装到本地仓库; 另一种是直接将解压后的文件夹复制到mvn本地仓库的com文件夹下,如d:\mvn。完成后目录结构如图31所示。

图31目录结构

2. 配置程序首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数。
package com.sohu.kafkademon;
public interface KafkaProperties
{
final static String zkConnect ="10.22.10.139:2181";
final static String groupId ="group1";
final static String topic ="topic1";
final static String kafkaServerURL ="10.22.10.139";
final static int kafkaServerPort =9092;
final static int kafkaProducerBufferSize =64 *1024;
final static int connectionTimeOut =20000;
final static int reconnectInterval =10000;
final static String topic2 ="topic2";
final static String topic3 ="topic3";
final static String clientId ="SimpleConsumerDemoClient";
}
Producer
package com.sohu.kafkademon;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
*@author leicui bourne_cui@163.com
*/
public class KafkaProducer extends Thread
{
private final kafka.javaapi.producer.Producer

producer;

private final String topic;

private final Properties props =new Properties();

public KafkaProducer(String topic)

{

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("metadata.broker.list", "10.22.10.139:9092");

producer =new kafka.javaapi.producer.Producer

(new ProducerConfig(props));

this.topic =topic;

}

@Override

public void run() {

int messageNo =1;

while (true)

{

String messageStr =new String("Message_" messageNo);

System.out.println("Send:" messageStr);

producer.send(new KeyedMessage

(topic, messageStr));

messageNo ;

try {

sleep(3000);

} catch (InterruptedException e) {

//TODO Auto-generated catch block

e.printStackTrace();

}

}

}

}

Consumer

package com.sohu.kafkademon;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

/**

*@author leicui bourne_cui@163.com

*/

public class KafkaConsumer extends Thread

{

private final ConsumerConnector consumer;

private final String topic;

public KafkaConsumer(String topic)

{

consumer =kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig());

this.topic =topic;

}

private static ConsumerConfig createConsumerConfig()

{

Properties props =new Properties();

props.put("zookeeper.connect", KafkaProperties.zkConnect);

props.put("group.id", KafkaProperties.groupId);

props.put("zookeeper.session.timeout.ms", "40000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

}

@Override

public void run() {

Map

topicCountMap =new HashMap

();

topicCountMap.put(topic, new Integer(1));

Map

>>consumerMap =consumer.createMessageStreams(topicCountMap);

KafkaStream

stream =consumerMap.get(topic).get(0);

ConsumerIterator

it =stream.iterator();

while (it.hasNext()) {

System.out.println("receive: " new String(it.next().message()));

try {

sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

3. 简单的发送/接收运行以下程序,可以进行简单的发送/接收消息。

package com.sohu.kafkademon;

/**

*@author leicui bourne_cui@163.com

*/

public class KafkaConsumerProducerDemo

{

public static void main(String[] args)

{

KafkaProducer producerThread =new KafkaProducer(KafkaProperties.topic);

producerThread.start();

KafkaConsumer consumerThread =new KafkaConsumer(KafkaProperties.topic);

consumerThread.start();

}

}

4. 高级别的consumer以下是比较负载的发送/接收的程序。

package com.sohu.kafkademon;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

/**

*@author leicui bourne_cui@163.com

*/

public class KafkaConsumer extends Thread

{

private final ConsumerConnector consumer;

private final String topic;

public KafkaConsumer(String topic)

{

consumer =kafka.consumer.Consumer.createJavaConsumerConnector(

createConsumerConfig());

this.topic =topic;

}

private static ConsumerConfig createConsumerConfig()

{

Properties props =new Properties();

props.put("zookeeper.connect", KafkaProperties.zkConnect);

props.put("group.id", KafkaProperties.groupId);

props.put("zookeeper.session.timeout.ms", "40000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

}

@Override

public void run() {

Map

topicCountMap =new HashMap

();

topicCountMap.put(topic, new Integer(1));

Map

>>consumerMap =consumer.createMessageStreams(topicCountMap);

KafkaStream

stream =consumerMap.get(topic).get(0);

ConsumerIterator

it =stream.iterator();

while (it.hasNext()) {

System.out.println("receive: " new String(it.next().message()));

try {

sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

本 章 小 结通过本章的学习,对Kafka的搭建有了一定的了解,知道了如何搭建Kafka系统以及对一些问题的处理方式。第4章将对Kafka的结构进行介绍。习题请试着按照本章的方法在本机上搭建Kafka集群。

第4章

Kafka消息传送

4.1消息传输的事务定义之前讨论了consumer和producer是怎么工作的,现在来讨论数据传输。数据传输的事务定义通常有以下三种级别。最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输。精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都被传输一次而且仅仅被传输一次,这是大家所期望的。大多数消息系统声称可以做到“精确的一次”,但是仔细阅读这些文档可以看到里面存在误导,例如没有说明当consumer或producer失败时怎么样,或者当有多个consumer并行时怎么样,或写入硬盘的数据丢失时又会怎么样。Kafka的做法要先进一些。当发布消息时,Kafka有一个committed的概念,一旦消息被提交了,只要消息被写入的分区所在的副本broker是活动的,数据就不会丢失。关于副本的活动的概念,下节会讨论,现在假设broker是不会离线(down)的。如果producer发布消息时发生了网络错误,但又不确定是在提交之前发生的还是提交之后发生的,这种情况虽然不常见,但是必须考虑进去。现在的Kafka版本还没有解决这个问题,将来的版本正在努力尝试解决。并不是所有的情况都需要“精确的一次”这样高的级别,Kafka允许producer灵活地指定级别。如producer可以指定必须等待消息被提交的通知,或者完全异步发送消息而不等待任何通知,或者仅仅等待leader声明它拿到了消息(followers没有必要)。现在从consumer方面考虑这个问题,所有的副本都有相同的日志文件和相同的offset,consumer维护自己消费的消息的offset,如果consumer不会崩溃,则可以在内存中保存这个值,当然谁也不能保证这一点。如果consumer崩溃了,会有另外一个consumer接着消费消息,它需要从一个合适的offset继续处理。这种情况下可以有以下选择。consumer可以先读取消息,然后将offset写入日志文件中,再处理消息。这存在一种可能,在存储offset后还没处理消息就崩溃(crash)了,新的consumer继续从这个offset开始处理,那么就会有些消息永远不会被处理,这就是上面说的“最多一次”。consumer可以先读取消息,处理消息,最后记录offset,但如果在记录offset之前就崩溃了,新的consumer会重复消费一些消息,这就是上面说的“最少一次”。“精确的一次”可以通过将提交分为两个阶段来解决。保存offset后提交一次,消息处理成功之后再提交一次。但是还有更简单的做法,将消息的offset和消息被处理后的结果保存在一起。例如,用Hadoop ETL处理消息时,将处理后的结果和offset同时保存在HDFS中,这样就能保证消息和offset同时被处理了。4.2性 能 优 化Kafka在提高效率方面做了很大努力。Kafka的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生很多次写操作。读方面,假设每个消息只被消费一次,读的量也是很大的,Kafka尽量使读的操作更轻量化。之前已经讨论了磁盘的性能问题,线性读写的情况下影响磁盘性能问题大约有两个方面: 太多琐碎的I/O操作和太多的字节复制。I/O问题可能发生在客户端和服务端之间,也可能发生在服务端内部的持久化的操作中。

大数据

实时计算与应用

0

第4章

Kafka消息传送

0

4.2.1消息集为了避免这些问题,Kafka建立了消息集(message set)的概念,将消息组织到一起,作为处理的单位。以消息集为单位处理消息,比以单个消息为单位处理会提升不少性能。producer把消息集一块发送给服务端,而不是一条条地发送; 服务端把消息集一次性追加到日志文件中,这样减少了琐碎的I/O操作。consumer也可以一次性请求一个消息集。另外一个性能优化是在字节复制方面。在低负载的情况下这不是问题,但是在高负载的情况下它的影响还是很大的。为了避免这个问题,Kafka使用了标准的二进制消息格式,这个格式可以在producer、broker和producer之间共享而无须做任何改动。

zero copy

broker维护的消息日志仅仅是一些目录文件,消息集以固定的格式写入日志文件中。这个格式是producer和consumer共享的,使Kafka可以一个很重要的点进行优化: 消息在网络上的传递。现代的UNIX操作系统提供了高性能的将数据从页面缓存发送到Socket的系统函数,Linux中的这个函数是sendfile()。为了更好地理解函数sendfile()的好处,先来看一般将数据从文件发送到Socket的数据流向。(1) 操作系统把数据从文件复制到内核中的页缓存。(2) 应用程序从页缓存把数据复制到自己的内存缓存。(3) 应用程序将数据写入内核中的Socket缓存。(4) 操作系统把数据从Socket缓存中复制到网卡接口缓存,从这里发送到网络。这显然是低效率的,有4次复制和两次系统调用。函数sendfile()直接将数据从页面缓存发送到网卡接口缓存,避免了重复复制,大大优化了性能。在一个多consumers的场景里,数据仅仅被复制到页面缓存一次而不是每次消费消息时都重复地进行复制,使消息以近乎网络带宽的速率发送出去。这样在磁盘层面几乎看不到任何的读操作,因为数据都是从页面缓存直接发送到网络。4.2.2数据压缩很多时候,性能的瓶颈并非CPU或者硬盘,而是网络带宽,对于需要在数据中心之间传送大量数据的应用更是如此。用户可以在没有Kafka支持的情况下各自压缩自己的消息,但是这将导致较低的压缩率,因为相比于将消息单独压缩,将大量文件压缩在一起才能起到最好的压缩效果。Kafka采用了端到端的压缩。因为有“消息集”的概念,客户端的消息可以一起被压缩后发送到服务端,并以压缩后的格式写入日志文件,以压缩的格式发送到consumer,消息从producer发出到consumer收到都是压缩的,只有在consumer使用时才被解压缩,所以叫作端到端的压缩。

4.3生产者和消费者4.3.1Kafka生产者的消息发送

producer直接将数据发送到broker的Leader(主节点),不需要在多个节点间进行分发。为了帮助producer做到这一点,所有的Kafka节点都可以及时地告知: 哪些节点是活动的,Topic目标分区的leader在哪儿。这样producer就可以直接将消息发送到目的地。客户端控制消息将被分发到哪个分区?可以通过负载均衡随机地选择,或者使用分区函数。Kafka允许用户实现分区函数,指定分区的key(关键字),将消息hash到不同的分区上(当然有需要也可以覆盖这个分区函数自己实现逻辑)。例如,如果指定的key是user id,那么同一个用户发送的消息都被发送到同一个分区上。经过分区之后,consumer就可以有目的地消费某个分区的消息。

批量发送可以有效地提高发送效率。Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存,然后一次请求批量发送出去。这个策略可以配置,可以指定缓存的消息达到某个量时就发送出去,或者缓存了固定的时间后就发送出去(如100条消息就发送,或者每5s发送一次)。这种策略将大大减少服务端的I/O次数。既然缓存是在producer端进行的,那么当producer崩溃时,这些消息就会丢失。Kafka 0.8.1的异步发送模式还不支持回调,不能在发送出错时进行处理。Kafka 0.9可能会增加这样的回调函数。4.3.2Kafka consumerKafka consumer消费消息时,向broker发出fetch请求去消费特定分区的消息。consumer指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息。customer拥有offset的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。Kafka最初考虑的问题是,customer应该从brokers拉取消息还是brokers将消息推送到consumer,也就是pull还是push。这方面,Kafka遵循了一种大部分消息系统共同的传统设计: producer将消息推送到broker,consumer从broker拉取消息。












大数据实时计算与应用 pdf下载声明

本pdf资料下载仅供个人学习和研究使用,不能用于商业用途,请在下载后24小时内删除。如果喜欢,请购买正版

pdf下载地址

版权归出版社和作者所有,下载链接已删除。如果喜欢,请购买正版!

链接地址:大数据实时计算与应用