牛客网项目——前置技术(八):Kafka

news/2024/6/19 5:44:54 标签: kafka, java, 分布式

文章目录

  • 1. 阻塞队列
  • 2. Kafka入门
    • 2.1 基本概念
    • 2.2 基本操作
  • 3. Spring整合kafka
    • 3.1 引入依赖
    • 3.2 application.properties配置
    • 3.3 Kafka测试

1. 阻塞队列

在这里插入图片描述

  1. 生产者线程
    1. 线程需要实现 Runnable 接口
    2. 重写接口的run方法
    3. 声明变量private BlockingQueue<Integer> queue接受传入的阻塞队列
    4. 创建有参构造器
    5. 实现示例逻辑,生产100个数据,put进阻塞队列,每生产一个数据停顿20毫秒,输出信息
java">class Producer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
  1. 消费者线程
    1. 线程需要实现 Runnable 接口
    2. 重写接口的run方法
    3. 声明变量private BlockingQueue<Integer> queue接受传入的阻塞队列
    4. 创建有参构造器
    5. 实现示例逻辑,不停的从队列中take,每生产一个数据停顿0-1000随机毫秒,输出信息
java">class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  1. main函数
    1. 实例化阻塞队列BlockingQueue queue = new ArrayBlockingQueue(10);
    2. 实例化一个生产者线程
    3. 实例化三个消费者线程
java">public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }

2. Kafka入门

在这里插入图片描述

2.1 基本概念

  • Kafka简介
    1. 早先只是消息队列,慢慢扩展功能不止消息队列
    2. 消息系统:消息队列的功能,核心功能
    3. 通过日志可以分析很多内容,用户追踪等
  • Kfaka特点
    1. 高吞吐量:可以处理TB级别数据
    2. 消息持久化:把数据永久保存到类似硬盘的某一介质。硬盘空间大,价格低。误解,读取硬盘速率高与低取决于对硬盘使用,对硬盘的顺序读取效率甚至高于对内存的随机读取,Kafka利用这一点保证能处理海量数据
    3. 高可靠性:分布式的服务,可以做集群部署,有容错能力
    4. 高扩展性:集群服务器不够用了简单的加一个服务器就可以
  • Kafka术语
    1. Broker:Kafka的服务器,集群中每一台服务器成为一个Broker
    2. Zookeeper:管理集群软件,Kafka内置了Zookeeper
    3. Topic:消息队列实现的方式两种,一种点对点,如上面的BlockingQueue,生产者把消息放到一个队列里,消费者就从这里面取值,消费者可能有多个,如果A消费者取到了这个数据这数据就出队了,每个数据只被一个消费者消费;还有一种方式发布订阅方式,生产者把消息队列放到某一个位置,消息可以被多个消费者读到。生产者把消息发布到的位置(空间)就叫Topic
    4. Partition:分区,对主题位置的分区,增强了并发能力
    5. Offsrt:消息在分区内存放的索引
    6. Leader Replica:主副本,从分区读数据时,主副本做响应
    7. Follower Replica:从副本只是备份,不负责响应

2.2 基本操作

以官网下载的2.12为例。首先更改配置文件中的data地址和log地址。

  1. 启动zookeeper
    bin\windows\zookeeper-server-start.bat config\zookeeper.properties

  2. 启动kafka
    bin\windows\kafka-server-start.bat config\server.properties

  3. 创建主题
    kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

  4. 查看某一服务器端口下所有topic
    kafka-topics.bat --list --bootstrap-server localhost:9092

  5. 以生产者身份发送消息
    kafka-console-producer.bat --broker-list localhost:9092 --topic test
    在这里插入图片描述

  6. 以消费者身份读取消息
    kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
    在这里插入图片描述
    消息通讯成功
    在这里插入图片描述

kafka_116">3. Spring整合kafka

在这里插入图片描述

3.1 引入依赖

<dependency>
	<groupId>org.springframework.kafka</groupId>
	<artifactId>spring-kafka</artifactId>
</dependency>

3.2 application.properties配置

  1. 服务器端口
  2. 消费者分组id
  3. 是否自动提交消费者的偏移量
  4. 自动提交频率
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

3.3 Kafka测试

  1. 生产者代码
    1. 注入容器KafkaTemplate
    2. 调用方法发消息
java">@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }
}
  1. 消费者代码
    1. 注解,标明监听的主题@KafkaListener(topics = {"test"})
    2. 封装消息
java">@Component
class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }
}
  1. Test方法
java">@Test
public void testKafka() {
    kafkaProducer.sendMessage("test", "你好");
    kafkaProducer.sendMessage("test", "在吗");

    try {
        Thread.sleep(1000 * 10);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
java">package com.nowcoder.community;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test", "你好");
        kafkaProducer.sendMessage("test", "在吗");

        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }

}

@Component
class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }


}

http://www.niftyadmin.cn/n/1016135.html

相关文章

Redis(一):Bitmaps,HyperLogLog,Geospatial

文章目录1. Bitmaps1.1 简介1.2 命令1.2.1 setbit1.2.2 getbit1.2.3 bitcount1.2.4 bitop1.3 Bitmaps与set对比2. HyperLogLog2.1 简介2.2 命令2.2.1 pfadd2.2.2 pfcount2.2.3 pfmerge3. Geospatial3.1 简介3.2 命令3.2.1 geoadd3.2.2 geopos3.2.3 geodist3.2.4 georadius1. Bi…

Redis(二):Jedis实现手机验证码功能

文章目录1. 解决思路1. 生成随机6位数字验证码2. 验证码放入redis&#xff0c;设置两分钟有效3. 验证验证码4. 测试方法要求&#xff1a;1、输入手机号&#xff0c;点击发送后随机生成6位数字码&#xff0c;2分钟有效2、输入验证码&#xff0c;点击验证&#xff0c;返回成功或失…

牛客网项目——项目开发(七):发布和显示系统通知

文章目录1. 发布系统通知1.1 事件实体Event1.2 开发事件生产者1.3 开发事件消费者1.4 修改对应的Controller1.4.1 CommentController1.4.2 LikeController1.4.3 discuss-detail.html 和 discuss.js 修改1.4.4 FollowController1.5 测试2. 显示系统通知2.1 通知列表2.1.1 Messag…

Redis(三):事务,锁机制,秒杀案例

文章目录1. 事务1.1 定义1.2 Multi、Exec、discard1.3 事务的错误处理1.4 事务三特性2. 锁机制2.1 事务冲突问题2.2 悲观锁和乐观锁2.3 WATCH key [key ...]3. 秒杀案例3.1 超卖问题——乐观锁解决3.2 连接超时问题——连接池解决3.3 库存遗留问题——LUA脚本解决1. 事务 1.1 …

牛客网项目——前置技术(九):Elasticsearch

文章目录1. Elasticsearch入门1.1 相关术语1.2 安装中文分词插件1.3 通过命令行访问Elasticsearch服务器2. Spring整合Elasticsearch2.1 引入依赖2.2 配置Elasticsearch2.2.1 application.properties2.2.2 CommunityApplication2.3 Spring整合Elasticsearch2.3.1 DiscussPost2.…

牛客网项目——项目开发(八):开发社区搜索功能

文章目录1. ElasticsearchService1.1 注入bean1.2 保存&#xff08;修改&#xff09;和删除1.3 搜索 searchDiscussPost2. DiscussPostController.addDiscussPost3. CommentController.addComment4. EventConsumer.handlePublishMessage5. SearchController5.1 注入属性5.2 搜索…

Redis(四):持久化技术RDB,AOF

文章目录1. RDB&#xff08;Redis DataBase&#xff09;1.1 是什么1.2 如何执行1.3 Fork1.4 RDB持久化流程1.5 dump.rdb文件1.6 如何触发RDB快照&#xff1b;保持策略1.6 优势&劣势1.6.1 优势1.6.2 劣势2. AOF&#xff08;Append Only File&#xff09;2.1 是什么2.2 AOF流…

牛客网项目——前置技术(十):Spring Security

文章目录1. JavaEE和SpringMVC2. 简化版社区2. 配置Spring Security2.1 pom.xml引入依赖2.2 user实体类2.3 UserService2.4 SecurityConfig 配置类2.5 HomeController1. JavaEE和SpringMVC Spring Security就是通过11个Fliter进行组合管理 2. 简化版社区 只保留登录相关代码…