【超级详细】熟悉Kafka的基本使用方法的实验【Windows】

这篇具有很好参考价值的文章主要介绍了【超级详细】熟悉Kafka的基本使用方法的实验【Windows】。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。


前言

Kafka 是由 Apache 软件基金会开发的一个开源消息队列平台,它是一种高性能、可扩展、分布式的发布-订阅消息系统。Kafka 的架构被设计为高效、低延迟,并具有高吞吐量、持久性和可靠性。

在 Kafka 中,生产者将消息发布到主题(topic)中,消费者则从主题中消费消息,使用者可以将其看作一个 highly scalable 分布式 commit log 或者消息系统 (Messaging system),每个消息包含一个 key,一个 value 和一个额外的 timestamp。消息保留时间通过配置进行控制,当时间或空间满了的时候就根据策略来清除老数据,默认情况下老数据只保存 7 天。

特点:
1.高吞吐量:Kafka 在发布-订阅消息方面具有非常高的性能。它可以几乎实时地处理高速流入的大量数据。
实时处理:Kafka 能够处理高达数以百万计的消息,并准确地将消息排序和在群组内进行调度。
2.持久性和可靠性:与传统的消息系统不同,Kafka 具有持久性和可靠性。客户端自己提交当前偏移量,避免了可能出现的重复读取问题。
3.可扩展性:Kafka 可以在不繁琐的配置或修改信息格式等环节就能进行扩展。
4.多样化数据类型和来源:通过使用支持多种编程语言和操作系统的 API,Kafka 可以连接到许多各种来源的应用程序。

总之,Kafka 具有高性能、低时延,适合处理大规模物联网设备、日志、报警信息、传感器数据、消息等。

所以今天就来写一份关于熟悉Kafka的基本使用方法的实验,希望可以与小伙伴们一起探讨~~😉😉


一、实验平台

(1)操作系统:Windows7及以上(我用的是Windows 11)
(2)Kafka版本:kafka_2.12-2.4.0
(3)MySQL版本:8.0

二、实验内容

一、Kafka与MySQL的组合使用

1.实验要求

假设有一个学生表student,如下表所示,编写Python程序完成如下操作。
(1)读取student表的数据内容,将其转换为JSON格式,发送给Kafka
(2)从Kafka中获取JSON格式数据,打印出来

sno sname ssex sage
95001 John M 23
95002 Tom M 23

2.在MySQL中操作

(1)打开MySQL
方式一:
kafka windows,python,kafka,python,人工智能,大数据,开发语言
方式二:

可以通过 DOS 命令启动 MySQL 服务,windows+R,在搜索框中输入cmd,进去之后再输入services.msc,就进去服务系统里了,再启动就行
kafka windows,python,kafka,python,人工智能,大数据,开发语言
kafka windows,python,kafka,python,人工智能,大数据,开发语言
进去以后输入密码就可以开始执行mysql语句了
(2)创建数据库

create database school001;

(3)查看数据库

show databases;

发现数据库已经被创建完成
kafka windows,python,kafka,python,人工智能,大数据,开发语言
(4)使用该数据库

use school001;

(5)在该数据库中创建student表

create table student(sno varchar(10),sname varchar(20),ssex char(2),sage int(5));

(6)查询该数据库中的student表

show tables;

(7)向student表中插入值

insert into student values("95001","John","M",23);
insert into student values("95002","Tom","M",23);

(8)查询student表中的数据

select * from student;

查询结果:
kafka windows,python,kafka,python,人工智能,大数据,开发语言
(到这里我们的student表就创建成功了!)😊😊

3.安装Kafka

简单介绍:
Kafka 的运行需要 Java 环境的支持,因此,安装 Kafka 前需要在 Windows 操作系统中安装 JDK
访问 Kafka 官网,下载 Kafka 2.4.0的安装文件 kafka 2.12-2.4.0.1gz,解压缩到" C : \ "目录下(也可以放到D盘,不过最好放在D盘根目录下,不然后续代码容易报错,我试过)
因为 Katka 的运行依赖于 Zookeeper ,因此,还需要下载并安装 Zookeeper 。当然, Kafka 也内置了 Zookeeper 服务,因此,也可以不额外安装 Zookeeper ,直接使用内置的Zookeeper 服务。为简单起见,这里直接使用内置的Zookeeper 服务。

win+r—>输入cmd然后回车

输入命令pip install kafka-python安装python-kafka模块

查看我们安装的模块的版本信息(出现kafka-python2.0.2表示我们安装模块成功)

具体怎么安装可参考:kafka安装部署

4.使用Kafka

在实验中要用到Kafka就要先启动它的Zookeeper服务和Kafka,且在实验过程中,千万不可以将其关闭,一旦关闭,服务就会停止😡😡
在 Windows 操作系统中
打开第1个 cmd 命令行窗口,启动 Zookeeper 服务

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\zookeeper-server-start.bat .\config\zookeeper.Properties

注意,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Zookeeper 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Zookeeper 服务就会停止
如图:
kafka windows,python,kafka,python,人工智能,大数据,开发语言
打开第2个 cmd 命令行窗口,然后输入如下命令启动 Kafka 服务

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-server-start.bat .\config\server.Properties

同样地,执行上面的命令以后, cmd 命令行窗口中会返回一堆信息,然后停住不动,没有回到命令提示符状态。这时,不要误以为是死机,这表示 Kafka 服务器已经启动,正处于服务状态。所以,不要关闭这个 cmd 命令行窗口,一旦关闭, Kafka 服务就会停止
kafka windows,python,kafka,python,人工智能,大数据,开发语言
若执行上面的命令以后,如果启动失败,并且出现提示信息"此时不应有\QuickTime\QTSstem\QTJava.zip ",则需要把环境变量 CLASSPATH 的相关信息删除。具体方法是,

右键单击"计算机",再单击"属性"一"高级系统设置"一"环境变量",然后,找到变量 CLASSPATH ,把类似下面的信息删除:
C : Program Files (x86) QuickTime\QTSystem QTJava . zip

然后重新启动计算机,让配置修改生效。重新启动计算机以后,再次按照上面的方法启动Zookeeper和Kafka

为了测试 Kafka ,这里创建一个主题,名称为" topic_test ",其包含一个分区,只有一个副本。在第3个 cmd 命令行窗口中执行如下命令

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-topics.bat -- create -- zookeeper localhost:2181-- replication -
 factor 1-- partitions 1-- topic topic_test 

kafka windows,python,kafka,python,人工智能,大数据,开发语言
可以继续执行如下命令,查看 topic _ test 是否创建成功:

.\bin\windows\kafka-topics.bat -- list -- zookeeper localhost:2181

kafka windows,python,kafka,python,人工智能,大数据,开发语言
如果创建成功,就可以在执行结果中看到 topic _ test
继续在第3个 cmd 命令行窗口中执行如下命令,创建一个生产者来产生消息

.\bin\windows\kafka-console-producer.bat -- broker-list localhost :9092 -topic topic_test 

该命令执行以后,屏幕上的光标会持续闪烁,这时,可以用键盘输入一些内容,例如:
I love Kafka
Kafka is good
新建第4个 cmd 命令行窗口,执行如下命令来消费消息

cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic_test --from-beginning 

该命令执行以后,屏幕上显示刚才输入的语句" I love Kafka “和” Kafka is good "

5.在PyCharm中操作

  1. 创建一个.py文件,写入以下代码,用于实现读取student表的数据内容,将其转换为JSON格式,发送给Kafka的功能
# 运行前先在win上启动zookeap和kafka
# 导入相关模块
from kafka import KafkaProducer
import json

# 连接kafka  json.dumps(v).encode('utf-8')将json格式的数抠转挨为字节类型,然后使用ut了-8进行编码
producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# 定义一个json格式的数第,json格式以键值对形式保存数掂,每个键值对之间使用逗号隔开
data = {
    'sno': '95001',
    'sname': 'John',
    'ssex': 'M',
    'sage': 23
}
# 发送数据
producer.send('test001', data)
# 关闭资源
producer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

  1. 创建一个.py文件,写入以下代码,用于实现从Kafka中获取JSON格式数据,打印出来的功能
# 运行前先在win上启动mysql
# 导入消费模块
import json
# 导入kafka的消费模块
from kafka import KafkaConsumer
import json
import pymysql.cursors

# 连接kafka
consumer = KafkaConsumer('test001', bootstrap_servers='localhost:9092', group_id=None, auto_offset_reset='earliest')
# 对获取的数据进行解析
for msg in consumer:
    # 转换为字符串类型
    msg1 = str(msg.value, encoding=('utf-8'))
    # 将字符串的数据加载为字典
    dict = json.loads(msg1)
    # 连接数据库
    connect = pymysql.Connect(
        host='localhost',
        port=3306,
        user='root',
        passwd='xxxxxxxx',#这是你MySQL数据库的密码
        db='school001',
        charset='utf8'
    )
    # 获取操作数抠库的对象<游标>
    cursor = connect.cursor()
    # 将数抠织存到mysqL(插入数掷)
    # 定义sql语句
    sql = "select * from student;"
    # 将数据作为参数传速给sqL,保存到hrgsql
    cursor.execute(sql)
    # 提交
    connect.commit()
    for row in cursor.fetchall():
        print("sno:%s\tsname:%s\tssex:%s\tsage:%d" % row)
    print("共查询出", cursor.rowcount, '条数据')
    connect.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

二、消费者手动提交

1.实验要求

生成一个data.json文件,内容如下:
data = [
{“name”: “Tony”, “age”: 21, “hobbies”: [“basketball”, “tennis”]},
{“name”: “Lisa”, “age”: 20, “hobbies”: [“sing”, “dance”]},
]
根据上面给出的data.json文件,执行如下操作。
(1)编写生产者程序,将JSON文件数据发送给Kafka。
(2)编写消费者程序,读取Kafka的JSON格式数据,并手动提交偏移量。

2.在PyCharm中操作

  1. 创建一个Test写入以下代码,来实现生成data.json文件的功能:
import json

data = [
    {"name": "Tony", "age": 21, "hobbies": ["basketball", "tennis"]},
    {"name": "Lisa", "age": 20, "hobbies": ["sing", "dance"]},
]

with open('../../data.json', 'w') as f:
    json.dump(data, f)
  1. 创建一个.py文件,编写生产者程序,来实现将JSON文件数据发送给Kafka的功能
# 可以使用 Python 的 json 模块读取 data.json 文件,并将数据转换为字符串后发送给 Kafka

from kafka import KafkaProducer
import json

data = [
    {
        "name": "Tony",
        "age": 21,
        "hobbies": ["basketball", "tennis"]
    },
    {
        "name": "Lisa",
        "age": 20,
        "hobbies": ["sing", "dance"]
    }
]

producer = KafkaProducer(bootstrap_servers='localhost:9092')
for item in data:
    # 将数据转换为字符串格式并发送给 Kafka 主题 test
    message = json.dumps(item).encode('utf-8')
    producer.send('test', value=message)

producer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

  1. 创建一个.py文件,编写消费者程序,来实现读取Kafka的JSON格式数据,并手动提交偏移量的功能
# 我们可以使用 Kafka 消费者 API 进行数据消费,并在处理完每个消息后手动提交偏移量。

from kafka import KafkaConsumer
import json

# 配置 Kafka 消费者,指定主题和分组等信息
consumer = KafkaConsumer(
    'test',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,  # 禁止自动提交偏移量
    group_id='my-group')

# 循环消费 Kafka 消息
for message in consumer:
    # 将传入的二进制消息内容解码为 JSON 格式的字符串
    item = json.loads(message.value.decode('utf-8'))
    print(item)

    # 手动提交偏移量,确保下次消费时从正确的位置开始
    consumer.commit()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

三、Kafka消费者订阅分区

1.实验要求

在命令行窗口中启动Kafka后,手动创建主题 “assign_topic” ,分区数量为2。具体命令如下:

.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic

根据上面给出的主题,完成如下操作。
(1)编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic” 。
(2)编写消费者程序1,订阅主题的分区0,只消费分区0数据。
(3)编写消费者程序2,订阅主题的分区1,只消费分区1数据。

2.在终端操作

首先要完成主题以及分区的创建才能编写程序,不然程序会报错
步骤:

  1. 使用windows+r,在弹窗中输入cmd打开终端
  2. 在终端中输入命令,创建主题和分区:
cd D:\kafka_2.12-2.4.0(这个是你安装kafka的路径)
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic assign_topic

结果如下图(这是我之前已经创建好的结果图):kafka windows,python,kafka,python,人工智能,大数据,开发语言

3.在PyCharm中操作

  1. 创建一个.py文件,写入以下代码,用于实现编写生产者程序,以通过唯一标识符UUID作为消息,发送给主题 “ assign_topic的功能:
from kafka import KafkaProducer
import uuid

producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

for i in range(5):
    message = str(uuid.uuid4()).encode('utf-8')
    producer.send('assign_topic', value=message)

producer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

  1. 创建一个.py文件,写入以下代码,用于实现订阅主题的分区0,只消费分区0数据的功能:
from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)
consumer.assign([TopicPartition('assign_topic', 0)])

for message in consumer:
    print("Partition 0 - Message value: {}".format(message.value))

consumer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

  1. 创建一个.py文件,写入以下代码,用于实现订阅主题的分区1,只消费分区1数据的功能:
from kafka import KafkaConsumer, TopicPartition

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    consumer_timeout_ms=1000
)
consumer.assign([TopicPartition('assign_topic', 1)])

for message in consumer:
    print("Partition 1 - Message value: {}".format(message.value))

consumer.close()

运行结果如下图所示:
kafka windows,python,kafka,python,人工智能,大数据,开发语言

三、实验小bug

1. Kafka连接报错:kafka.errors.NoBrokersAvailable: NoBrokersAvailable 是什么原因:?
答:是因为程序运行了多次的原因
把tmp文件和logs文件里面的东西都删掉,就可以解决了
kafka windows,python,kafka,python,人工智能,大数据,开发语言
kafka windows,python,kafka,python,人工智能,大数据,开发语言
2. 为什么消费者程序1中有东西输出而消费者程序2中什么却什么也没输出?

消费者程序1和消费者程序2是对同一个主题的两个消费者应用程序。可以针对以下情况进行分析。
在主题 assign_topic 中,Kafka有多个分区,可用于并行处理消息。在这里,被消费的消息都来自此主题的第一个分区(即分区 0)。
消费者程序1使用了 .subscribe() 方法来订阅主题,这将导致消费者加入到消费组中,然后通过负载均衡策略从所有分区接收消息。因此,消费者程序1输出打印了分区 0 中的消息。
消费者程序2使用了 .assign() 方法手动分配消费者处理的分区,而且只分配了主题 assign_topic 的第一个分区(即分区 0)。但是,由于该程序没有运行足够长的时间,并且没有消费到任何未提交的偏移量,所以当应用程序终止时不会向Kafka服务器发送任何提交请求,这就可能导致在下一次启动时重复消费确认过的消息。因此,在生产环境中,请务必根据具体情况定期地提交所消费的分区的偏移量。


总结

以上就是对Kafka的基本使用方法的实验啦,有不明白的地方可以留言哦,希望能共同进步~~😀😀😀😀😀😀文章来源地址https://www.toymoban.com/news/detail-732112.html

到了这里,关于【超级详细】熟悉Kafka的基本使用方法的实验【Windows】的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • c#客户端Kafka的使用方法

    c#客户端Kafka的使用方法

    Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,现在是Apache软件基金会的顶级项目之一。Kafka能够处理大规模的实时数据流,支持高可靠性、高可扩展性、低延迟和高吞吐量。它主要用于构建实时数据管道和流式处理应用程序。 Kafka的核心概念包括:Producer(生产者)

    2024年02月12日
    浏览(9)
  • vim基本使用方法

    vim是linux上一个有多个编辑模式的编辑器。 这里主要介绍三种模式: 命令模式(Normal mode) 执行命令的模式,主要任务就是控制光标移动、复制和删除。 插入模式(Insert mode) 可以进行文字输入,编写代码模式。 末行/底行模式(last line mode) 文件保存退出,文本替换、列出

    2024年02月12日
    浏览(12)
  • docker基本使用方法

    docker基本使用方法

    Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的 Linux 机器上,也可以实现虚拟化。Docker 使您能够将应用程序与基础架构分开,从而可以快速交付软件。通过利用 Docker 的方法来快速交付,测试和部署代码,您可以大大减

    2024年02月13日
    浏览(9)
  • Wireshark基本使用方法

    Wireshark基本使用方法

    目录 1、Wireshark介绍 1.1 Wireshark使用 1.2 支持的协议 2.Wireshark主要应用 3.Wireshark安装  4.Wireshark页面介绍 4.1 分组列表  4.2 分组详情  4.3 分组字节流  5.Wireshark导航 5.1 开始捕获分组 5.2 停止捕获分组 5.3 重新开始当前捕获 5.4、捕获选项 5.5 打开以保存的捕获文件 5.6 保存捕

    2024年02月13日
    浏览(9)
  • uCharts基本使用方法

    uCharts基本使用方法

    首先下载ucharts文件 https://gitee.com/uCharts/uCharts 下载下来看到有这些文件,小伙伴们可以先去示例项目里面看 引入u-charts.js文件,主要构建就是new uCharts和配置context,其他的就跟其他charts配置一样 可以看例子写的,也可以自己试验一波 方法写入两种方式 第一种方式 ucharts下载

    2024年02月04日
    浏览(8)
  • pandas库基本使用方法

    Pandas是Python中一个非常流行的数据处理库,它提供了一些强大的数据结构和数据分析工具,可以帮助我们更方便、快捷地处理数据。下面我们来介绍一下Pandas的使用方法。 1.导入Pandas库 在使用Pandas之前,需要先导入Pandas库。通常的做法是使用import语句导入Pandas库,并给它起一

    2024年02月10日
    浏览(7)
  • anaconda 创建虚拟环境、激活,使用的基本方法及安装包的基本方法

    anaconda 创建虚拟环境、激活,使用的基本方法及安装包的基本方法

    第一步 打开Anaconda Prompt 可以看到这里是base环境。 第二步 我们现在要创建一个新的虚拟环境,名叫test,且python版本为3.8 在安装过程中会出现下面这个选项,输入y就好了 创建成功如下图所示!hiahia! 我们已经学会如何创建新的环境了!没错!我们非常棒!下面我们就看看,

    2024年03月14日
    浏览(13)
  • Loguru基本、进阶使用方法小结。

    Loguru基本、进阶使用方法小结。

    loguru是一个开源的Python日志记录器,它提供了简单且易于使用的接口,同时具有高度的可定制性。loguru的特点包括:支持格式化日志、记录到文件或终端、支持自动清理日志、支持旋转日志等。 loguru的基本使用方法非常简单,只需要导入loguru模块,并使用logger函数创建一个日

    2024年02月16日
    浏览(22)
  • java反射的基本使用方法

    当我们使用 Java 开发时,有时需要获取某个类的信息,例如类的属性、方法和构造函数等,然后在程序运行期间动态地操作它们。Java 反射就是用来实现这个目的的一种机制。 Java 反射(Reflection)是 Java 编程语言在运行时动态获取类的信息以及动态调用对象方法的能力。它可

    2024年02月16日
    浏览(12)
  • Pytorch基本概念和使用方法

    Pytorch基本概念和使用方法

    目录 1 Adam及优化器optimizer(Adam、SGD等)是如何选用的? 1)Momentum 2)RMSProp 3)Adam 2 Pytorch的使用以及Pytorch在以后学习工作中的应用场景。 1)Pytorch的使用 2)应用场景 3 不同的数据、数据集加载方式以及加载后各部分的调用处理方式。如DataLoder的使用、datasets内置数据集的使

    2024年02月07日
    浏览(13)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包