【Kafka源码解读和使用指南】第04篇:Kafka生产者快速上手——5分钟发出你的第一条消息
2026/6/6 13:23:36 网站建设 项目流程

上一篇:【第03篇】Kafka核心概念图解——Topic、Partition、Offset、Broker一次看懂
下一篇:【第05篇】Kafka消费者快速上手——消息消费从入门到不迷路


摘要

概念看完了,不来点真格的怎么行?这一篇我们直接上手写代码——用Java把消息真正发出去。

KafkaProducer是Kafka客户端的"快递员",它的API设计得相当优雅:创建一个Producer实例,构造一条ProducerRecord,调用send()方法,搞定。但优雅归优雅,里面藏着不少门道——三种发送方式该怎么选?acks=0/1/all到底差在哪?key.serializer为什么要配?本文用完整的可运行代码,把这些问题一次性讲清楚。

读完这篇,你不仅能发出第一条消息,还能理解每种发送方式背后的性能与可靠性权衡。


一、先把依赖配上

没Maven依赖,代码写再多也跑不起来。

1.1 Maven 项目依赖

<!-- pom.xml --><dependencies><!-- Kafka Clients —— 核心依赖,Producer和Consumer都在里面 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.9.0</version></dependency><!-- SLF4J Simple —— 让Kafka内部日志输出到控制台,方便调试 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>2.0.16</version></dependency><!-- Lombok —— 可选,减少样板代码 --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.34</version><scope>provided</scope></dependency></dependencies>

1.2 Gradle 项目依赖

dependencies{implementation'org.apache.kafka:kafka-clients:3.9.0'implementation'org.slf4j:slf4j-simple:2.0.16'}

版本选择提示kafka-clients的版本最好和服务端Kafka版本保持一致(或略新),避免协议不兼容。Kafka客户端和服务端之间有很好的向前/向后兼容性,但用相同版本最稳。


二、第一种发送方式:发后即忘(Fire-and-Forget)

这是最简单的发送方式——把消息扔出去,不管结果。

packagecom.example.kafka;importorg.apache.kafka.clients.producer.*;importjava.util.Properties;publicclassQuickStartProducer{publicstaticvoidmain(String[]args){// 1. 配置Properties —— KafkaProducer的"出生证明"Propertiesprops=newProperties();// bootstrap.servers:Kafka集群入口,多个用逗号分隔// 格式:host1:port1,host2:port2props.put("bootstrap.servers","localhost:9092");// key.serializer / value.serializer:// Kafka在网络上传输的是字节数组,必须指定序列化器// StringSerializer 是最常用的,把String转为byte[]props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 2. 创建 KafkaProducer 实例(建议用 try-with-resources 自动关闭)try(KafkaProducer<String,String>producer=newKafkaProducer<>(props)){// 3. 构造一条消息(ProducerRecord)// 参数:topic, key, value// key可以为null(此时分区器会轮询选择分区)ProducerRecord<String,String>record=newProducerRecord<>("hello-kafka","key-1","Hello Kafka!");// 4. 发送!不关心结果 —— 这就是"发后即忘"producer.send(record);// ⚠️ 关键点:如果不调用 close() 或 flush(),// 消息可能还缓冲在 RecordAccumulator 里没发出去!// 所以务必调用 close() 或 producer.flush()System.out.println("消息已发送(发后即忘模式)");}// try-with-resources 会自动调用 producer.close()}}
发后即忘的执行路径: ┌──────────┐ send() ┌────────────────────┐ │ Producer │ ──────────→ │ RecordAccumulator │ ← 消息先缓存到这里 └──────────┘ └────────┬───────────┘ │ 后台Sender线程异步发送 ▼ ┌──────────────┐ │ Broker │ └──────────────┘ 结果:不等待Broker响应,直接返回 → 吞吐最高,但可能丢消息

三、第二种发送方式:同步发送(Synchronous)

同步发送会阻塞当前线程,直到收到Broker的响应(或超时)。

publicclassSyncSendProducer{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();props.put("bootstrap.servers","localhost:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// acks=all 确保消息写入所有ISR副本才返回成功// 这是最严格的可靠性保证(性能也最慢)props.put("acks","all");try(KafkaProducer<String,String>producer=newKafkaProducer<>(props)){for(inti=0;i<10;i++){ProducerRecord<String,String>record=newProducerRecord<>("hello-kafka","key-"+i,"消息-"+i);try{// send() 返回一个 Future<RecordMetadata>// .get() 会阻塞,直到Broker响应RecordMetadatametadata=producer.send(record).get();// RecordMetadata 包含消息被写入的分区和offsetSystem.out.printf("消息发送成功!topic=%s, partition=%d, offset=%d%n",metadata.topic(),metadata.partition(),metadata.offset());}catch(Exceptione){// 同步发送会抛出明确的异常:// - RecordTooLargeException:消息太大// - TimeoutException:等待超时// - InterruptedException:线程被中断System.err.println("消息发送失败: "+e.getMessage());e.printStackTrace();}}}}}

同步发送的适用场景

场景原因
订单创建订单消息丢了老板会找你
支付通知钱的事不能含糊
审计日志合规要求必须确认写入

四、第三种发送方式:异步回调(Asynchronous with Callback)

这是生产环境最推荐的方式——既有性能,又能处理发送结果。

publicclassAsyncCallbackProducer{publicstaticvoidmain(String[]args)throwsInterruptedException{Propertiesprops=newProperties();props.put("bootstrap.servers","localhost:9092");props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 重试次数(默认Integer.MAX_VALUE,建议设合理值)props.put("retries",3);try(KafkaProducer<String,String>producer=newKafkaProducer<>(props)){for(inti=0;i<10;i++){intmsgId=i;// lambda需要effectively finalProducerRecord<String,String>record=newProducerRecord<>("hello-kafka","key-"+i,"异步消息-"+i);// 异步发送 + Callback回调producer.send(record,newCallback(){@OverridepublicvoidonCompletion(RecordMetadatametadata,Exceptionexception){if(exception!=null){// 发送失败:记录日志、告警、写入死信队列System.err.printf("消息[%d]发送失败: %s%n",msgId,exception.getMessage());}else{// 发送成功:metadata里有分区和offset信息System.out.printf("消息[%d]发送成功! "+"topic=%s, partition=%d, offset=%d%n",msgId,metadata.topic(),metadata.partition(),metadata.offset());}}});}// 重要:让主线程等一会,确保回调有足够时间执行// 生产环境中,应用通常是长期运行的,不需要这句Thread.sleep(2000);}}}

三种发送方式对比

方式性能可靠性适用场景
发后即忘★★★★★★★日志收集、监控埋点
同步发送★★★★★★★金融交易、订单处理
异步回调★★★★★★★★绝大多数生产场景

五、关键配置参数详解

KafkaProducer有一大堆配置参数,但真正需要关心的就这几个:

5.1 bootstrap.servers(必配)

作用:Kafka集群的"入口地址" 格式:host1:port1[,host2:port2,...] 示例:localhost:9092 或 kafka1:9092,kafka2:9092,kafka3:9092 注意: - 不需要配所有Broker地址,配2~3个就够(客户端会自动发现集群所有节点) - 如果配的Broker挂了,客户端会自动尝试列表中的其他地址

5.2 key.serializer / value.serializer(必配)

作用:指定Key和Value的序列化方式 内置序列化器: ├── StringSerializer → String → byte[] (最常用) ├── IntegerSerializer → Integer → byte[] (数字场景) ├── LongSerializer → Long → byte[] (时间戳场景) ├── ByteArraySerializer → byte[] → byte[] (已序列化好的数据) ├── DoubleSerializer → Double → byte[] (浮点数) └── 自定义Serializer → 实现 Serializer 接口

5.3 acks(可靠性核心参数)

acks=0: 消息发出去就不管了,不等待Broker任何响应 性能:最高 ★★★★★ 可靠性:最低 ★ 场景:日志收集,丢几条无所谓 acks=1(默认): 只要Partition Leader写入成功就返回成功 如果Leader挂了且Follower还没复制,会丢消息 性能:中等 ★★★ 可靠性:中等 ★★★ 场景:一般业务场景的默认选择 acks=all(或 acks=-1): 必须等待所有ISR副本都写入成功才返回 性能:最低 ★★ 可靠性:最高 ★★★★★ 场景:金融、订单、审计

5.4 retries 和 retry.backoff.ms

// 发送失败后重试次数(默认0,但建议设置)props.put("retries",3);// 每次重试的间隔(毫秒,默认100ms)props.put("retry.backoff.ms",500);

注意retries只重试可重试异常(如网络抖动、Leader切换),不可重试异常(如消息太大、认证失败)不会重试。


六、实战:订单创建事件发送

来一个真实场景——电商系统的订单创建事件。

6.1 定义订单事件模型

importlombok.AllArgsConstructor;importlombok.Data;importlombok.NoArgsConstructor;importjava.math.BigDecimal;importjava.time.LocalDateTime;@Data@NoArgsConstructor@AllArgsConstructorpublicclassOrderCreatedEvent{privateStringorderId;// 订单IDprivateStringuserId;// 用户IDprivateBigDecimalamount;// 订单金额privateStringproductId;// 商品IDprivateIntegerquantity;// 数量privateLocalDateTimecreatedAt;// 创建时间privateOrderStatusstatus;// 订单状态publicenumOrderStatus{CREATED,PAID,SHIPPED,COMPLETED,CANCELLED}}

6.2 用JSON序列化发送(生产环境标准做法)

importcom.fasterxml.jackson.databind.ObjectMapper;publicclassOrderEventProducer{privatefinalKafkaProducer<String,String>producer;privatefinalObjectMapperobjectMapper=newObjectMapper();privatefinalStringtopic="order-events";publicOrderEventProducer(){Propertiesprops=newProperties();props.put("bootstrap.servers","localhost:9092");// 生产环境推荐用StringSerializer + JSON字符串// 比自定义Serializer更灵活,且和语言无关props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 开启幂等生产者(防止网络重试导致消息重复)props.put("enable.idempotence","true");this.producer=newKafkaProducer<>(props);}publicvoidsendOrderCreatedEvent(OrderCreatedEventevent){try{StringeventJson=objectMapper.writeValueAsString(event);ProducerRecord<String,String>record=newProducerRecord<>(topic,event.getOrderId(),eventJson);producer.send(record,(metadata,exception)->{if(exception!=null){// 生产环境:写错误日志 + 告警 + 死信队列System.err.println("订单事件发送失败! orderId="+event.getOrderId());}else{System.out.println("订单事件发送成功! orderId="+event.getOrderId()+", partition="+metadata.partition()+", offset="+metadata.offset());}});}catch(Exceptione){System.err.println("序列化订单事件失败: "+e.getMessage());}}publicvoidclose(){producer.close();}// 测试publicstaticvoidmain(String[]args){OrderEventProducerproducer=newOrderEventProducer();OrderCreatedEventevent=newOrderCreatedEvent("ORD-20260530-001","USR-001",newBigDecimal("299.99"),"PROD-888",2,LocalDateTime.now(),OrderCreatedEvent.OrderStatus.CREATED);producer.sendOrderCreatedEvent(event);producer.close();}}

本篇小结

本文从零开始,带你用Java发出了第一条Kafka消息:

  1. 三种发送方式,各有适用场景——发后即忘最快,同步最稳,异步回调是生产环境首选
  2. acks参数是可靠性的总开关——0/1/all,每个选择背后都是吞吐和可靠性的权衡
  3. 关键配置心中有数——bootstrap.servers是入口,serializer是序列化,"enable.idempotence=true"防重复
  4. 实战订单事件——用JSON序列化 + 异步回调,这是生产环境的标准姿势
  5. 下一站是消费者——消息发出去了,总得有人来消费吧?

上一篇:【第03篇】Kafka核心概念图解——Topic、Partition、Offset、Broker一次看懂
下一篇:【第05篇】Kafka消费者快速上手——消息消费从入门到不迷路


需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询