Kafka3.0.0版本——消费者(自动提交 offset)

news/2024/6/18 1:50:51 标签: kafka

目录

    • 一、自动提交offset的相关参数
    • 二、消费者(自动提交 offset)代码示例

一、自动提交offset的相关参数

  • 官网文档
    在这里插入图片描述

  • 参数解释

    参数描述
    enable.auto.commi默认值为 true,消费者会自动周期性地向服务器提交偏移量。
    auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
  • 图解分析

    在这里插入图片描述

二、消费者(自动提交 offset)代码示例

  • 消费者自动提交 offset代码

    // 自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    // 提交时间间隔 1秒
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
    
  • 消费者自动提交 offset代码完整代码

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumerAutoOffset {
    
        public static void main(String[] args) {
    
            // 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
    
            // 自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    
            // 提交时间间隔 1秒
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 first
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sevenTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

http://www.niftyadmin.cn/n/5017635.html

相关文章

Linux - 如何启动进程、线程

Linux - 如何启动进程_linux 启动进程_夜流冰的博客-CSDN博客 1、linux中&#xff0c;可以通过系统调用 fork(), execve() 来创建进程。 2、老的linux中&#xff0c;fork()就是最底层的接口了。现代Linux中&#xff0c;fork()是通过 clone()这个系统调用实现的。所以clone()才…

计算机网络中的应用层和传输层(http/tcp)

目录 1、协议的通俗理解 1.1 理解协议 2.应用层 2.1 http协议 2.2 HTTP的方法 2.3 HTTP的状态码 2.4 HTTP常见Header 3、传输层 3.1 端口号 3.1.1 端口号范围划分 3.1.2 netstat 3.1.3 认识知名端口号(Well-Know Port Number) 3.2 UDP协议 3.2.1 UDP协议端格式 3…

UMA 2 - Unity Multipurpose Avatar☀️九.Expressions表情管理与表情插件推荐 (口型同步 / 表情管理)

文章目录 🟥 Expressions文件位置🟧 功能 : 解决嘴巴张开问题🟨 Expressions : 表情面板API讲解🟩 表情插件推荐 : 口型同步 / 表情管理🟥 Expressions文件位置 Expressions也是UMA内置5种实用Recipes之一,位置如下. 使用方法: 如下图所示,将Recipes拖到Additional…

Windows11系统安装Hyper-V虚拟平台和Window10虚拟机

一、Hyper-V虚拟平台简介 Hyper-V是微软公司提供的硬件虚拟化平台(即:可实现在一个物理硬件平台上的Windows系统上通过Hyper-V虚拟化平台可以创建多个不同的系统,方便软件开发人员、IT 专业人员或技术爱好者进行开发测试)。 Hyper-V与其他第三方虚拟机平台比较的优势 序号H…

ssrf伪造服务器请求

一.搭建环境 1.进入目录 2.打开容器如果没有就会下载 docker拉去环境 3. 地址 4.外部地址 5.使用 发现报错 6.抓包 7.发现公网可以使用 9.内网地址

前端报错合集

error Component name “index“ should always be multi-word vue/multi-word-component-names 的解决办法 原因组件命名是 没有采用驼峰 error Component name “index“ should always be multi-word vue/multi-word-component-names 的解决办法_error component name &qu…

数分面试题2-牛客

1、面对大方差如何解决 1&#xff0c;AB实验场景下&#xff0c;如果一个指标的方差较大表示它的波动较大&#xff0c;那么实验组和对照组的显著差异可能是因为方差较大即随机波动较大。解决方法有&#xff1a;PSM方法、CUPED(方差缩减) PSM代表"Propensity Score Matchin…

【PHY】3GPP UE能力类别的变化

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G算力网络技术标准研究。 博客…