kafka_0">Java操作kafka客户端
文章目录
- Java操作kafka客户端
- 3.Java操作kafka客户端
- 1.引入依赖
- 2. Kafka服务配置
- 3、生产者(Producer)实现
- 1. 基础配置与发送消息
- 2. 关键配置说明
- 4.消费者(Consumer)实现
- 1. 基础配置与消费消息
- 2. 关键配置说明
- 3.auto.offset.reset参数可选值及行为
- 1.代码示例与行为验证
- 1. 配置为 `earliest`
- 2. 配置为 `latest`
- 3. 配置为 `none`
- 2.关键注意事项
- 1. Offset 提交机制的影响
- 2. 消费者组隔离性
- 3. 命令行验证 Offset
- 3、生产环境最佳实践
- 4、常见问题解答
- Q:配置了 `latest`,为什么还能消费到旧消息?
- Q:如何让消费者组永久保留 Offset?
- 5.主题管理示例(AdminClient)
- 6.最佳实践与注意事项
- 7.关于flush和close方法的说明
来源参考的deepseek,如有侵权联系立删
kafka_4">3.Java操作kafka客户端
Java API提供以下核心接口:
- Producer API:发送消息。
- Consumer API:订阅消息。
- Streams API:流式处理。
- Admin API:管理Topic和集群。
1.引入依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
2. Kafka服务配置
确保已启动Zookeeper和Kafka服务,默认端口分别为2181
和9092
3、生产者(Producer)实现
1. 基础配置与发送消息
无需提前创建topic
java">import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerDemo {
public static void main(String[] args) {
// 1. 配置生产者参数
Properties props = new Properties();
// Broker地址
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 消息确认机制
props.put("acks", "all");
// 重试次数
props.put("retries", 3);
// 2. 创建生产者实例
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// 3. 构造消息并发送
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"test-topic", // 主题名称
"key-" + i, // 消息键
"value-" + i // 消息值
);
// 异步发送(可改用get()同步等待)
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("消息发送成功:topic=%s, partition=%d, offset=%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
producer.flush(); // 确保所有消息发送完成
}
}
}
2. 关键配置说明
参数 | 说明 |
---|---|
bootstrap.servers | Broker地址列表,多个用逗号分隔 |
key.serializer | 键的序列化类(如StringSerializer) |
value.serializer | 值的序列化类 |
acks | 消息持久化确认机制(0 /1 /all ) |
retries | 发送失败后的重试次数 |
batch.size | 批量发送的消息大小(字节) |
4.消费者(Consumer)实现
1. 基础配置与消费消息
java">import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerDemo {
public static void main(String[] args) {
// 1. 配置消费者参数
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("group.id", "test-group"); // 消费者组ID
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest"); // 从最早消息开始消费
props.put("enable.auto.commit", "false"); // 关闭自动提交偏移量
// 2. 创建消费者实例
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("test-topic")); // 订阅主题
while (true) {
// 3. 轮询消息(超时时间100ms)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
// 4. 手动提交偏移量(同步提交)
consumer.commitSync();
}
}
}
}
可收到实时消费的消息,但队列中消息并没有移除,
- 消息保留规则由 Broker 配置控制,与消费者无关。
- 消费者 Offset 仅标记消费进度,不会删除消息。
- 通过
kafka-consumer-groups.sh
工具监控消费状态。 - 生产环境中,合理设置
log.retention.hours
和log.retention.bytes
。
2. 关键配置说明
参数 | 说明 |
---|---|
group.id | 消费者组ID,相同组内共享分区 |
auto.offset.reset | 无偏移量时的策略(earliest /latest ) |
enable.auto.commit | 是否自动提交偏移量(建议false 手动控制) |
max.poll.records | 单次poll最大消息数 |
3.auto.offset.reset参数可选值及行为
值 | 作用 | 典型场景 |
---|---|---|
earliest | 从分区的最早消息开始消费(从头消费) | 需要处理 Topic 中所有历史消息 |
latest | 从分区的最新消息开始消费(仅消费新消息) | 实时处理最新数据,忽略历史消息 |
none | 抛出异常(NoOffsetForPartitionException ) | 需要严格确保 Offset 有效性 |
1.代码示例与行为验证
1. 配置为 earliest
java">props.put("auto.offset.reset", "earliest");
参数生效的触发条件
场景 | auto.offset.reset 是否生效 | 消费起始位置 |
---|---|---|
消费者组首次启动(无 Offset) | ✅ | 根据参数值(earliest /latest ) |
Offset 已提交且有效(未过期) | ❌ | 从已提交 Offset 继续消费 |
Offset 已过期(消息被删除) | ✅ | 根据参数值重新定位 |
行为:
- 如果消费者组首次启动,会从 Topic 每个分区的第一条消息开始消费。
- 如果 Offset 过期(例如消息被删除),会从现存的最早消息开始消费。
适用场景:
- 数据回放(重放全部历史数据)
- 测试环境需要消费完整数据集
2. 配置为 latest
java">props.put("auto.offset.reset", "latest");
行为:
- 如果消费者组首次启动,只消费启动后新写入的消息。
- 如果 Offset 过期,会从当前最新消息开始消费。
适用场景:
- 生产环境实时处理(避免处理历史积压数据)
- 日志收集系统(只需最新日志)
3. 配置为 none
java">props.put("auto.offset.reset", "none");
行为:
- 如果 Offset 无效,直接抛出
NoOffsetForPartitionException
。 - 需手动处理异常或确保 Offset 始终有效。
适用场景:
- 高可靠性系统(需严格监控 Offset 有效性)
2.关键注意事项
1. Offset 提交机制的影响
- 如果启用了自动提交 (
enable.auto.commit=true
),消费者会定期提交 Offset。
重复消费风险:若消息处理失败但 Offset 已提交,会导致消息丢失。 - 推荐做法:
java"> props.put("enable.auto.commit", "false"); // 关闭自动提交
// 处理完消息后手动提交 Offset
consumer.commitSync();
2. 消费者组隔离性
- 不同group.id的 Offset 互相独立。例如:
- 消费者组 A(
group.id=group1
)配置为latest
→ 只消费新消息。 - 消费者组 B(
group.id=group2
)配置为earliest
→ 可以消费全部消息。
- 消费者组 A(
3. 命令行验证 Offset
通过 Kafka 工具查看消费者组的 Offset:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group your-group-id
输出示例:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
test-topic 0 5000 10000 5000
- LAG:未消费的消息数量。若 LAG 持续增长,说明消费速度跟不上生产速度。
3、生产环境最佳实践
- 明确业务需求:
- 需要重放数据 →
earliest
- 仅处理实时数据 →
latest
- 需要重放数据 →
- 监控 Offset 提交:
- 使用
kafka-consumer-groups.sh
定期检查 LAG。 - 集成监控系统(如 Prometheus + Grafana)。
- 使用
- 防御性代码:
java"> try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理消息
consumer.commitSync(); // 同步提交
}
} catch (NoOffsetForPartitionException e) {
// 处理 Offset 无效的极端情况
logger.error("Offset 无效,需人工介入!", e);
}
4、常见问题解答
Q:配置了 latest
,为什么还能消费到旧消息?
- 可能原因:
消费者组之前已提交过 Offset,且当前 Offset 指向旧消息位置。 - 解决:
重置消费者组 Offset:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group your-group-id --reset-offsets --to-latest --execute --topic test-topic
Q:如何让消费者组永久保留 Offset?
- Kafka 默认行为:
Offset 存储在内部 Topic__consumer_offsets
中,默认保留时间为 7 天。 - 修改保留策略:
# 修改 Offset 保留时间(单位:毫秒)
bin/kafka-configs.sh --bootstrap-server localhost:9092 \
--entity-type topics --entity-name __consumer_offsets \
--alter --add-config retention.ms=604800000
5.主题管理示例(AdminClient)
java">import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaAdminDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "127.0.0.1:9092");
try (AdminClient admin = AdminClient.create(props)) {
// 创建主题(3分区,1副本)
NewTopic newTopic = new NewTopic("test-topic2", 3, (short) 1);
CreateTopicsResult result = admin.createTopics(Collections.singleton(newTopic));
result.all().get(); // 阻塞等待创建完成
System.out.println("主题创建成功");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
6.最佳实践与注意事项
- 生产者优化:
- 启用压缩(
compression.type=snappy
)减少网络开销。 - 合理设置
batch.size
和linger.ms
提高吞吐量。
- 启用压缩(
- 消费者可靠性:
- 使用手动提交偏移量,避免消息丢失或重复消费。
- 处理
CommitFailedException
,防止因处理超时导致提交失败。
- 序列化选择:
- 默认支持String、ByteArray等序列化器。
- 复杂对象推荐使用JSON(Jackson)或Avro。
- 消费者组管理:
- 通过
kafka-consumer-groups.sh
工具监控消费进度。 - 避免频繁重平衡(Rebalance),调整
session.timeout.ms
参数。
- 通过
7.关于flush和close方法的说明
flush()
:强制发送缓冲区中所有未发送的消息(同步等待发送完成)close()
:释放生产者占用的所有资源(包括线程、网络连接、内存等)
若未调用close()
可能导致:
- 线程泄漏:生产者后台的
Sender
线程未终止 - 连接泄漏:与Broker的TCP连接未关闭
- 内存泄漏:未释放消息缓冲区内存
可通过jstack
或VisualVM
工具检查线程状态验证。
关键区别说明
方法 | 作用 | 是否必须调用 | 是否自动包含对方功能 |
---|---|---|---|
flush() | 清空发送缓冲区,确保所有消息被发送 | 可选(按需调用) | ❌ 不释放资源 |
close() | 关闭生产者并释放资源 | 必须调用 | ✅ 内部会自动调用flush() |
正确写法(推荐):
java">try (Producer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(record);
producer.flush(); // 显式清空缓冲区(可选)
} // 自动调用close(),包含flush()
错误写法(资源泄漏):
java">Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(record);
producer.flush();
// 忘记调用close() → 线程/连接未释放!
最佳实践建议
1.优先使用try-with-resources(Java 7+特性):
java"> try (Producer<String, String> producer = new KafkaProducer<>(props)) {
// 发送消息...
} // 自动调用close()
这是最安全的写法,无需手动调用flush()
或close()
2.需要立即发送时:
java"> producer.send(record);
producer.flush(); // 强制立即发送(如实时系统关键消息)
// ...其他操作...
producer.close(); // 仍需显式关闭
3.不要依赖finalize():
Kafka客户端的finalize()
方法已废弃,不能保证资源释放。
4.KafkaProducer.close()
源码:
java">public void close() {
close(Duration.ofMillis(Long.MAX_VALUE)); // 默认无限等待
}
public void close(Duration timeout) {
// ...
flush(); // 内部自动调用flush()
client.close(); // 释放网络资源
metrics.close(); // 关闭监控指标
// ...
}