kafka代码示例

news/2025/2/22 0:15:40

kafka_0">安装kafka

Windows安装kafka, 详情见:https://blog.csdn.net/sinat_32502451/article/details/133067851

Linux 安装kafka,详情见:https://blog.csdn.net/sinat_32502451/article/details/133080353

添加依赖包:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.1.10.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

kafka_25">kafka配置:

在 application.properties 添加以下配置:

### kafka生产者
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

### kafka消费者
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.max-poll-records=1
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=5

生产者代码:

  • KafkaProducerService :

生产者发送消息。

@Component
public class KafkaProducerService {


    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 发送消息,处理回调。
     * 在发送消息时会自动创建你设置的 topic。
     *
     */
    public void send()  {
        MyMsg myMsg = new MyMsg();
        myMsg.setName("lin");
        myMsg.setId("1234");

        //发送消息
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("myTopic1", "key", JSON.toJSONString(myMsg));
        //处理回调的结果,比如消息发送失败的处理。如果不需要回调,也可以不处理。
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("消息发送失败." + ex);
            }

            @Override
            public void onSuccess(SendResult<String, String> result) {
                ProducerRecord<String, String> producerRecord = result.getProducerRecord();
                RecordMetadata recordMetadata = result.getRecordMetadata();
                System.out.println("消息发送成功.producerRecord:"+ JSON.toJSONString(producerRecord)
                                + ",recordMetadata:" + JSON.toJSONString(recordMetadata));

            }

        });

    }


}

  • 调用生产者发送消息:
@RestController
@RequestMapping("/")
public class KafkaController {

    @Autowired
    private KafkaProducerService kafkaProducerService;

    @PostMapping(value = "/kafka/send")
    public void send()  {
        kafkaProducerService.send();
    }

}

消费者代码:

  • KafkaConsumerService:
@Component
public class KafkaConsumerService {


    /**
     * Kafka监听器,可以监听消息。
     * 指定需要监听的 kafka 主题 topics,可以是多个topic.
     * 指定消费者群组 groupId,可以不写.
     *
     */
    @KafkaListener( topics = {"myTopic1"} , groupId ="myGroup")
    public void consume(ConsumerRecord<String, String> consumerRecord)  {
        System.out.println("消费者接收到信息,内容为:" + consumerRecord.value());
        System.out.println("偏移量:" +  consumerRecord.offset());

    }


}

测试结果 :

调用生产者发送消息,消费者成功接收到消息,类似如下:

消费者接收到信息,内容为:{"id":"1234","name":"lin"}
偏移量:19

http://www.niftyadmin.cn/n/5132679.html

相关文章

python 安装成功后终端显示的还是低版本

如果你下载了新版的 Python&#xff0c;但在使用时发现仍然是之前的版本&#xff0c;可能是因为新版的 Python 没有替代系统环境中的旧版 Python。 检查 PATH 环境变量&#xff1a;在命令行中输入 python --version 来查看当前默认的 Python 版本。如果显示的是旧版 Python 的…

拉扎维模拟CMOS集成电路设计西交张鸿老师课程P10~13视频学习记录

--------------------------------------------------------------------------------------------------------------------------------- p10 短沟道&#xff0c;除了沟长调&#xff0c;还可能出现速度饱和问题&#xff1b; 但是在拉扎维这本书里面没有考虑这个问题&#…

2023年中国大学生程序设计竞赛女生专场

Dashboard - 2023年中国大学生程序设计竞赛女生专场 - Codeforces K. RSP 诈骗题 / 签到题。本来用排列组合算期望发现这个平局很奇妙&#xff0c;好像期望最大的就是m-1个平局&#xff0c;一局决定胜负。 void solve() {int n,m; cin>>n>>m;cout<<"…

EASYX动画效果实现

eg1:绘制小球的动画效果 通过一下的代码实现小球从左向右移动效果&#xff0c;计算小球的移动速度和帧率实现移动效果平和造成视觉上的错觉 #include <stdio.h> #include <easyx.h> #include <iostream> #include <math.h> #define PI 3.14 // 1PI …

CAN接口的PCB Layout规则要求汇总

随着时代高速发展&#xff0c;控制器局域网&#xff08;CAN&#xff09;接口的应用越来越广泛&#xff0c;尤其是在汽车电子、航空航天等领域中发挥着重要作用&#xff0c;为了确保CAN接口的可靠性和稳定性&#xff0c;工程师必须在其PCB Layout方面下功夫&#xff0c;下面来看…

基于 Qt控制开发板 LED和C语言控制LED渐变亮度效果

## 资源简介 在STM32开发板,板载资源上有两个可自由控制的 LED。如下图原理 图其中我们以操作 LED1 为示例,LED1 为出厂系统的心跳指示灯。 ## 应用实例 想要控制这个 LED,首先出厂内核已经默认将这个 LED 注册成了 gpio-leds类型设备。所以我们可以直接在应用层接口直接…

shell_57.Linux创建自己的重定向

创建自己的重定向 1.创建输出文件描述符 可以用 exec 命令分配用于输出的文件描述符。和标准的文件描述符一样&#xff0c;一旦将替代性文件描述符指向文件&#xff0c; 此重定向就会一直有效&#xff0c;直至重新分配。 $ cat test13 #!/bin/bash # using an alternative f…

递归下降语法分析 实验实现(电子科技大学编译技术Icoding实验)

现在前面&#xff1a; 由于本人对编译技术并无兴趣&#xff08;我觉得我这种数字动漫方向的根本没必要做实验&#x1f605;&#xff09;&#xff0c;所以只完成最基本的 Icoding 上的内容&#xff0c;而全部非终结符对应函数的实现&#xff0c;我实在无心也无力&#xff0c;完…