使用kafka-clients操作数据(java)

这篇具有很好参考价值的文章主要介绍了使用kafka-clients操作数据(java)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、添加依赖

     <!--    kafka-clients-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.5.1</version>
        </dependency>

二、生产者

自定义分区,可忽略文章来源地址https://www.toymoban.com/news/detail-615966.html

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPatitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        String msgStr = value.toString();
        if(msgStr.contains("a")){
            return 1;
        }
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

1、普通消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //连接参数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //关联自定义分区器 可选
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");

        //优化参数 可选
        //缓冲器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);


        //创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        //异步发送数据
        for (int i = 0; i < 10; i++) {
            //给first主题发消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i));
            //回调异步发送
            kafkaProducer.send(new ProducerRecord<String, String>("first", "hello2" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "分区:" + recordMetadata.partition());
                    }
                }
            });
            kafkaProducer.send(new ProducerRecord<String, String>("first", "a" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题:" + recordMetadata.topic() + "分区" + recordMetadata.partition() + "a");
                    }
                }
            });
            Thread.sleep(500);
        }

        //同步
        for (int i = 0; i < 10; i++) {
            //给first主题发消息
            kafkaProducer.send(new ProducerRecord<String, String>("first", "sync_hello" + i)).get();
        }

        //关闭资源
        kafkaProducer.close();
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
a0
hello0
hello20
a1
hello1
hello21
a2
hello2
hello22
a3
hello3
hello23
a4
hello4
hello24
a5
hello5
hello25
a6
hello6
hello26
a7
hello7
hello27
a8
hello8
hello28
a9
hello9
hello29
sync_hello0
sync_hello1
sync_hello2
sync_hello3
sync_hello4
sync_hello5
sync_hello6
sync_hello7
sync_hello8
sync_hello9

2、事务消息

 public static void main(String[] args) throws ExecutionException, InterruptedException {
        //配置
        Properties properties = new Properties();
        //连接参数
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.157.130:9092");
        //序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //关联自定义分区器 可选
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "org.minos.kafka.producer.MyPatitioner");

        //优化参数 可选
        //缓冲器大小 32M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 30 * 1024 * 1024);
        //批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * 1024);
        //Linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
        //acks
        properties.put(ProducerConfig.ACKS_CONFIG, "-1");
        //重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        //指定事务ID
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactional_id_01");
        properties.put("enable.idempotence", "true");

        //创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        //事务消息 初始化
        kafkaProducer.initTransactions();
        //开始事务
        kafkaProducer.beginTransaction();
        try {
            kafkaProducer.send(new ProducerRecord<String, String>("first", "Transactions")).get();
            //提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            //终止事务
            kafkaProducer.abortTransaction();
        } finally {
            //关闭资源
            kafkaProducer.close();
        }
    }
root@ubuntu2203:/usr/local/kafka_2.12-3.5.1/bin# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
Transactions

到了这里,关于使用kafka-clients操作数据(java)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 在Spring Boot微服务集成Kafka客户端(kafka-clients)操作Kafka

    记录 :459 场景 :在Spring Boot微服务集成Kafka客户端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生产者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消费者Consumer。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安装 :https://blog.csdn.ne

    2024年02月12日
    浏览(46)
  • 通过Java client访问Kafka

    1. Install Kafka 1) download kafka binary from https://kafka.apache.org/downloads 2) extract binary 2. Start Kafka 1) start zookeeper in daemon mode 2) start kafka server in daemon mode 3. Test Kafka 1) create a topic 2) producer events 3) consumer events 4. Access Kafka from Java client 1) download kafka client binary from https://jar-download.com/artifacts

    2024年02月05日
    浏览(12)
  • 大数据之Kafka————java来实现kafka相关操作

    一、在java中配置pom 二、生产者方法 (1)、Producer Java中写在生产者输入内容在kafka中可以让消费者提取 [root@kb144 config]# kafka-console-consumer.sh --bootstrap-server 192.168.153.144:9092 --topic kb22 (2)、Producer进行多线程操作   生产者多线程是一种常见的技术实践,可以提高消息生产的并发性

    2024年02月11日
    浏览(8)
  • C#:了解LINQ,简化数据查询和操作的强大工具

    C#:了解LINQ,简化数据查询和操作的强大工具

    以下是 LINQ(Language Integrated Query)中常见的及其作用,并给出一个示例以展示其执行结果: from :用于指定数据源,可以是集合、数组、数据库表等。 示例: where :用于筛选满足指定条件的元素。 示例: select :用于选择返回的结果集。 示例: orderby :用于对结果集

    2024年02月12日
    浏览(15)
  • 使用spring-kafka的Java API操作Kafka集群的Topic

    记录 :462 场景 :在Spring Boot微服务集成spring-kafka-2.8.2操作Kafka集群的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka集群安装 :https://blog.csdn.net/zhangbeizhen18/article/details/131156084 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析:

    2024年02月10日
    浏览(13)
  • 使用Kafka客户端(spring-kafka)的Java API操作Kafka的Topic

    记录 :458 场景 :在Spring Boot微服务集成Kafka客户端spring-kafka-2.8.2操作Kafka的Topic的创建和删除。 版本 :JDK 1.8,Spring Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。 Kafka安装 :https://blog.csdn.net/zhangbeizhen18/article/details/129071395 1.微服务中 配置Kafka信息 1.1在pom.xml添加依赖 pom.xml文件: 解析

    2024年02月09日
    浏览(9)
  • 使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

    使用flink的sql-client.sh,测试mysql-->kafka-->kafka-->mysql实时流

    目录 1. 环境介绍 2. mysql建表 3. flinksql建表 3.1 进入flinksql客户端  ​3.2 配置输出格式 ​3.3 flink建表 3.4 任务流配置 4. 测试 4.1 插入测试数据 4.2 查看结果表数据​ 4.3 新增测试数据 4.4 再次查看结果表数据 服务 版本 zookeeper 3.8.0 kafka 3.3.1 flink 1.13.5 mysql 5.7.34 jdk 1.8 scala 2.12 连接器

    2024年02月11日
    浏览(14)
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“t”键分割,数据内容及数据格式如下: 项目环境说明 开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。 1、新创一个文件folder命名为li

    2024年02月13日
    浏览(14)
  • 最新版ES8的client API操作 Elasticsearch Java API client 8.0

    最新版ES8的client API操作 Elasticsearch Java API client 8.0

    作者:ChenZhen 本人不常看网站消息,有问题通过下面的方式联系: 邮箱:1583296383@qq.com vx: ChenZhen_7 我的个人博客地址:https://www.chenzhen.space/🌐 版权:本文为博主的原创文章,本文版权归作者所有,转载请附上原文出处链接及本声明。📝 如果对你有帮助,请给一个小小的s

    2024年02月04日
    浏览(13)
  • Elasticsearch Java REST Client 批量操作(Bulk API)

    上一篇:Elasticsearch Java REST Client Term Vectors API 下一篇:Elasticsearch Java REST Client Search APIs 查询 BulkRequest可用于使用单个请求执行多个索引、更新和/或删除操作。 它需要至少一个操作添加到 Bulk 请求中: multiGetAPI 在单个 http 请求中并行执行多个请求get 。 MultiGetRequest,添加 `M

    2024年02月11日
    浏览(12)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包