kafka消息队列

  |  

Kafka 是由 Apache 软件基金会开发的一个开源流处理平台,由 Scala 和 Java 编写。Kafka 是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像 Hadoop 一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka 的目的是通过 Hadoop 的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

zookeeper 安装

1
2
3
4
5
6
7
docker pull mesoscloud/zookeeper:3.4.8

docker run -p 2181:2181 \
--name zookeeper \
-v /data/docker/zookeeper/conf:/opt/zookeeper/conf \
-v /data/docker/zookeeper/data:/tmp/zookeeper \
-d mesoscloud/zookeeper:3.4.8

kafka 安装

1
2
3
4
5
6
7
8
9
10
11
12
docker pull wurstmeister/kafka:2.11-2.0.0

192.168.10.160 dev服务器ip

docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-v /data/docker/kafka/logs:/opt/kafka_2.11-2.0.0/logs \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.10.160:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.10.160:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-d wurstmeister/kafka:2.11-2.0.0

nodejs 使用

yarn add kafka-node

Kafka 的优势

高吞吐量、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒;

可扩展性:kafka 集群支持热扩展;

持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;

容错性:允许集群中节点故障(若副本数量为 n,则允许 n-1 个节点故障);

高并发:支持数千个客户端同时读写。

Kafka 适合以下应用场景

日志收集:一个公司可以用 Kafka 可以收集各种服务的 log,通过 kafka 以统一接口服务的方式开放给各种 consumer;

消息系统:解耦生产者和消费者、缓存消息等;

用户活动跟踪:kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后消费者通过订阅这些 topic 来做实时的监控分析,亦可保存到数据库;

运营指标:kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告;

流式处理:比如 spark streaming 和 storm。

与常用 Message Queue 对比

1.RabbitMQ

RabbitMQ 是使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了 Broker 构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

2.Redis

Redis 是一个基于 Key-Value 对的 NoSQL 数据库,开发维护很活跃。虽然它是一个 Key-Value 数据库存储系统,但它本身支持 MQ 功能,所以完全可以当做一个轻量级的队列服务来使用。对于 RabbitMQ 和 Redis 的入队和出队操作,各执行 100 万次,每 10 万次记录一次执行时间。测试数据分为 128Bytes、512Bytes、1K 和 10K 四个不同大小的数据。实验表明:入队时,当数据比较小时 Redis 的性能要高于 RabbitMQ,而如果数据大小超过了 10K,Redis 则慢的无法忍受;出队时,无论数据大小,Redis 都表现出非常好的性能,而 RabbitMQ 的出队性能则远低于 Redis。

3.ZeroMQ

ZeroMQ 号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ 能够实现 RabbitMQ 不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这 MQ 能够应用成功的挑战。ZeroMQ 具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用 ZeroMQ 程序库,可以使用 NuGet 安装,然后你就可以愉快的在应用程序之间发送消息了。但是 ZeroMQ 仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter 的 Storm 0.9.0 以前的版本中默认使用 ZeroMQ 作为数据流的传输(Storm 从 0.9 版本开始同时支持 ZeroMQ 和 Netty 作为传输模块)。

4.ActiveMQ

ActiveMQ 是 Apache 下的一个子项目。 类似于 ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于 RabbitMQ,它少量代码就可以高效地实现高级应用场景。

5.Kafka/Jafka

Kafka 是 Apache 下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而 Jafka 是在 Kafka 之上孵化而来的,即 Kafka 的一个升级版。具有以下特性:快速持久化,可以在 O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到 10W/s 的吞吐速率;完全的分布式系统,Broker、Producer、Consumer 都原生自动支持分布式,自动实现负载均衡;支持 Hadoop 数据并行加载,对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka 通过 Hadoop 的并行加载机制统一了在线和离线的消息处理。Apache Kafka 相对于 ActiveMQ 是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

Kafka 缺点

  1. 由于是批量发送,数据并非真正的实时;
  2. 对于 mqtt 协议不支持;
  3. 不支持物联网传感数据直接接入;
  4. 仅支持统一分区内消息有序,无法实现全局消息有序;
  5. 监控不完善,需要安装插件;
  6. 依赖 zookeeper 进行元数据管理;

配置日志自动删除

docker exec -it kafka bash
cd /opt/kafka_2.11-2.0.0/config
vi server.properties

修改时间

log.retention.hours=1

修改大小

log.retention.bytes=1048576

设置

log.segment.bytes=10240

添加设置

log.cleanup.policy=delete

按照这个配置文件,只保存一小时的日志和大小不超过 1M,当有一个条件满足,就触发删除日志操作。log.segment.bytes=10240 表示每个文件的大小不超过 10K。

目前只是设置了时间 3 天删除日志

在这里插入图片描述

使用

  1. 发送者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
'use strict';
let kafka = require('kafka-node'),
Producer = kafka.Producer,
// KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({ 'kafkaHost': '127.0.0.1:9092' }),
producer = new Producer(client),
// km = new KeyedMessage('key', 'message'),
payloads = [
{ 'topic': 'topic2', 'messages': JSON.stringify({
'key': 'test',
}), 'partition': 0 }, // 发送一条消息
// { 'topic': 'topic2', 'messages': [ 'hello', 'world', km ] }, // 同时发送多条消息
];

(() => {

// 生产者


producer.on('ready', function() {
producer.send(payloads, function(err, data) {
console.log('data===============>', data);
});
});

producer.on('error', function(err) { });


})();
  1. 接收者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
'use strict';
let kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.KafkaClient({ 'kafkaHost': '127.0.0.1:9092' });
(async () => {
let options = {
'groupId': 'kafka-node-group', // 消费者组ID,默认为`kafka-node-group`
'autoCommit': true, // 自动提交配置
'autoCommitIntervalMs': 5000,
// 最大等待时间是发出请求时没有足够数据时阻止等待的最长时间(以毫秒为单位),默认为100ms
'fetchMaxWaitMs': 100,
// 这是给出响应所必需的消息的最小字节数,默认为1字节
'fetchMinBytes': 1,
// 要包含在此分区的消息集中的最大字节。这有助于限制响应的大小。
'fetchMaxBytes': 1024 * 1024,
// 如果设置为true,则消费者将从有效负载中的给定偏移量获取消息
'fromOffset': false,
// 如果设置为'buffer',则值将作为原始缓冲区对象返回。
'encoding': 'utf8',
'keyEncoding': 'utf8',
};
let consumer = new Consumer(
client,
[
{ 'topic': 'topic2', 'partition': 0 },
],
{
// 自动提交配置 (false 不会提交偏移量,每次都从头开始读取)
'autoCommit': true,
'autoCommitIntervalMs': 5000,
// 如果设置为true,则consumer将从有效负载中的给定偏移量中获取消息
'fromOffset': false,
}
);

consumer.on('message', function(message) {
console.log(message);
});

// 关闭接收订阅消息
// consumer.close(true, function(e) {
// console.log(e);
// });
})();

文档

kafka

kafka-node