Kafka入门

  6 分钟   7629 字    |    

Kafka入门

简介

Kafka:是一个高吞吐量、分布式的发布-订阅消息系统。kafka是一款开源的、轻量级的、分布式、可分区和具有复制备份[Replicated]的、基于Zookeeper协调管理的分布式流平台的功能强大的消息系统。

单机吞吐量 时效性 可用性 消息可靠性 核心特点
10万级 延迟在ms级内 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机后,不会丢失数据,不会导致不可用 经过参数优化配置可以做到0丢失 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是实时上的标准。

仅仅提供较少的核心功能,但是提供超高的吞吐量,ms级别的延迟,极高的可用性以及可靠性,分布式可以任意扩展。同时kafka最好是支撑较少的topic数量即可,保证其超高的吞吐量。

概念

  1. Topic

    Topic 是用于存储消息的逻辑概念,可以看作一个消息集合。每个 Topic 可以有多个生产者向其中推送(push)消息,也可以有任意多个消费者消费其中的消息

  2. 消息

    Kafka通信的基本单位,消息由一串字节构成,其中主要由 key 和 value 构成,key 和 value 也都是 byte 数组。key的主要作用是根据一定的策略,将此消息路由到指定的分区中,这样就可以保证包含同一 key 的消息全部写入同一分区中,key 可以是 null。消息的真正有效负载是 value 部分的数据。

    为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。

  3. 分区partitions

    主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序.

  4. 副本replication

    每个 Partition 可以有多个副本,每个副本中包含的消息是一样的。每个分区至少有一个副本,当分区中只有一个副本时,就只有 Leader 副本,没有 Follower 副本。

  5. 消费者群组

    消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体,一个群组中的消费者订阅的都是相同的主题,每个消费者接收主题一部分分区的消息。可以通过增加消费组的消费者来进行水平扩展提升消费能力

  6. 偏移量offset

    偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

  7. 重平衡Rebalance

    消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

安装

下载镜像

# 安装kafka需同时安装zookeeper
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka

单机启动

# 启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
# 启动kafka
docker run -d --name kafka \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=[局域网ip]:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[局域网ip]:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka

命令

#进入容器
docker exec -it ${CONTAINER ID} /bin/bash
cd opt/bin
#单机方式:创建一个主题
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic mykafka
#运行一个生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mykafka
#运行一个消费者
bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic mykafka --from-beginning

配置

# 修改Server配置
vim opt/kafka/config/server.properties

#broker 的全局唯一编号,不能重复 
broker.id=0
#服务端口,默认9092
port=9092
# 是否启用删除 topic 功能使能
delete.topic.enable=true
# 处理网络请求的线程数量 
num.network.threads=3
# 用来处理磁盘 IO 的线程数量 
num.io.threads=8
# 发送套接字的缓冲区大小 
socket.send.buffer.bytes=102400
# 接收套接字的缓冲区大小 
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小 
socket.request.max.bytes=104857600
# kafka 运行日志存放的路径(也是消息的存放地址)
log.dirs=/opt/module/kafka/logs 
# topic 在当前 broker 上的分区个数
num.partitions=1
# 用来恢复和清理 data 下数据的线程数量 
num.recovery.threads.per.data.dir=1 
# segment 文件保留的最长时间,超时将被删除(默认7天)
log.retention.hours=168
# 配置连接 Zookeeper 集群地址 
zookeeper.connect=zookeeper101:2181,zookeeper102:2181,zookeeper103:2181

实操

  1. pom.xml
<dependency>
  <!--导入kafka作为输出消息队列-->
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.9.1</version>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>2.9.1</version>
    <scope>test</scope>
</dependency>
  1. application.properties
# Kafka服务的地址
spring.kafka.bootstrap-servers=localhost:9092
# 若设置大于0的值,客户端会将发送失败的记录重新发送
spring.kafka.producer.retries=0
# kafka producer发送消息最大值
spring.kafka.producer.properties.max.request.size=10485760
# kafka 序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
有时候会报错:The request included a message larger than the max message size the server will accept 说明发送的消息太大,kafka server拒绝接收

修改配置文件:

vim /opt/kafka/config/server.properties
# server
message.max.bytes=20971520
# producer
max.request.size=20971520
# consumer
max.partition.fetch.bytes=20971520
  1. Controller
@RestController
@Slf4j
public class ReceiveDataController {
    @Resource
    private KafkaTemplate<Object, Object> template;
		/**
     * 发送kafka
     * @param raw
     * @return
     */
    @RequestMapping(value = "/sender", method = RequestMethod.POST)
    public void addData(@RequestParam("raw") String raw) {
        template.send("topic_input", raw);
    }

    /**
     * 监听kafka的输入
     * @param input
     */
    @KafkaListener(id = "myGroup", topics = "topic_input")
    public void listen(String input) {
        log.info("input value: {}" , input);
    }
}

参考

[ 1 ] https://www.cnblogs.com/angelyan/p/14445710.html

[ 2 ] https://www.cnblogs.com/wendyw/p/13034083.html

[ 3 ] https://juejin.cn/post/6844903495670169607

[ 4 ] https://stackoverflow.com/questions/55181375/org-apache-kafka-common-errors-recordtoolargeexception-the-request-included-a-m

~  ~  The   End  ~  ~


 赏 
感谢您的支持,我会继续努力哒!
支付宝收款码
tips
文章二维码 分类标签:技术消息队列kafka
文章标题:Kafka入门
文章链接:http://120.46.217.131:82/archives/45/
最后编辑:2022 年 10 月 10 日 13:47 By Yang
许可协议: 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)

相关推荐

热门推荐

(*) 2 + 3 =
快来做第一个评论的人吧~