Kafka入门
Kafka入门
简介
Kafka:是一个高吞吐量、分布式的发布-订阅消息系统。kafka是一款开源的、轻量级的、分布式、可分区和具有复制备份[Replicated]的、基于Zookeeper协调管理的分布式流平台的功能强大的消息系统。
单机吞吐量 | 时效性 | 可用性 | 消息可靠性 | 核心特点 |
---|---|---|---|---|
10万级 | 延迟在ms级内 | 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机后,不会丢失数据,不会导致不可用 | 经过参数优化配置可以做到0丢失 | 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是实时上的标准。 |
仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级别的延迟,极高的可用性以及可靠性,分布式可以任意扩展。同时kafka最好是支撑较少的topic数量即可,保证其超高的吞吐量。
概念
Topic
Topic 是用于存储消息的逻辑概念,可以看作一个消息集合。每个 Topic 可以有多个生产者向其中推送(push)消息,也可以有任意多个消费者消费其中的消息
消息
Kafka通信的基本单位,消息由一串字节构成,其中主要由 key 和 value 构成,key 和 value 也都是 byte 数组。key的主要作用是根据一定的策略,将此消息路由到指定的分区中,这样就可以保证包含同一 key 的消息全部写入同一分区中,key 可以是 null。消息的真正有效负载是 value 部分的数据。
为了提高效率, 消息会
分批次
写入 Kafka,批次就代指的是一组消息。分区partitions
主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的
伸缩性
,单一主题中的分区有序,但是无法保证主题中所有的分区有序.副本replication
每个 Partition 可以有多个副本,每个副本中包含的消息是一样的。每个分区至少有一个副本,当分区中只有一个副本时,就只有 Leader 副本,没有 Follower 副本。
消费者群组
消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体,一个群组中的消费者订阅的都是相同的主题,每个消费者接收主题一部分分区的消息。可以通过增加消费组的消费者来进行
水平扩展提升消费能力
偏移量offset
偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。
重平衡Rebalance
消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。
安装
下载镜像
# 安装kafka需同时安装zookeeper
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
单机启动
# 启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
# 启动kafka
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=[局域网ip]:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[局域网ip]:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
命令
#进入容器
docker exec -it ${CONTAINER ID} /bin/bash
cd opt/bin
#单机方式:创建一个主题
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka
#运行一个生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
#运行一个消费者
bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning
配置
# 修改Server配置
vim opt/kafka/config/server.properties
#broker 的全局唯一编号,不能重复
broker.id=0
#服务端口,默认9092
port=9092
# 是否启用删除 topic 功能使能
delete.topic.enable=true
# 处理网络请求的线程数量
num.network.threads=3
# 用来处理磁盘 IO 的线程数量
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
# kafka 运行日志存放的路径(也是消息的存放地址)
log.dirs=/opt/module/kafka/logs
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# segment 文件保留的最长时间,超时将被删除(默认7天)
log.retention.hours=168
# 配置连接 Zookeeper 集群地址
zookeeper.connect=zookeeper101:2181,zookeeper102:2181,zookeeper103:2181
实操
- pom.xml
<dependency>
<!--导入kafka作为输出消息队列-->
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.1</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.9.1</version>
<scope>test</scope>
</dependency>
- application.properties
# Kafka服务的地址
spring.kafka.bootstrap-servers=localhost:9092
# 若设置大于0的值,客户端会将发送失败的记录重新发送
spring.kafka.producer.retries=0
# kafka producer发送消息最大值
spring.kafka.producer.properties.max.request.size=10485760
# kafka 序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
有时候会报错:The request included a message larger than the max message size the server will accept 说明发送的消息太大,kafka server拒绝接收
修改配置文件:
vim /opt/kafka/config/server.properties
# server
message.max.bytes=20971520
# producer
max.request.size=20971520
# consumer
max.partition.fetch.bytes=20971520
- Controller
@RestController
@Slf4j
public class ReceiveDataController {
@Resource
private KafkaTemplate<Object, Object> template;
/**
* 发送kafka
* @param raw
* @return
*/
@RequestMapping(value = "/sender", method = RequestMethod.POST)
public void addData(@RequestParam("raw") String raw) {
template.send("topic_input", raw);
}
/**
* 监听kafka的输入
* @param input
*/
@KafkaListener(id = "myGroup", topics = "topic_input")
public void listen(String input) {
log.info("input value: {}" , input);
}
}
参考
[ 1 ] https://www.cnblogs.com/angelyan/p/14445710.html
[ 2 ] https://www.cnblogs.com/wendyw/p/13034083.html
[ 3 ] https://juejin.cn/post/6844903495670169607
文章标题:Kafka入门
文章链接:http://120.46.217.131:82/archives/45/
最后编辑:2022 年 10 月 10 日 13:47 By Yang
许可协议: 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)