一、环境准备
1.1 开发环境要求
| 组件 | 版本要求 | 说明 |
|---|---|---|
| JDK | 1.8+ | Flink 1.17支持Java 8/11/17 |
| Maven | 3.6+ | 项目构建工具 |
| IDEA | 2020+ | 推荐IntelliJ IDEA |
| Flink | 1.17.0 | 本文使用版本 |
1.2 检查Java环境
$java-versionopenjdk version"1.8.0_352"OpenJDK Runtime Environment(build1.8.0_352-b08)OpenJDK64-Bit Server VM(build25.352-b08, mixed mode)二、创建Flink Maven项目
2.1 新建Maven工程
打开IntelliJ IDEA,按以下步骤操作:
步骤1:点击File → New → Project,选择Maven项目类型
步骤2:填写项目信息
- Name:
FlinkTutorial - GroupId:
com.atguigu - ArtifactId:
FlinkTutorial - Version:
1.0-SNAPSHOT
步骤3:选择项目存储路径,点击Finish完成创建
2.2 项目结构预览
创建完成后,项目结构如下:
FlinkTutorial/ ├── pom.xml # Maven配置文件 └── src/ ├── main/ │ ├── java/ # Java源码目录 │ └── resources/ # 资源文件目录 └── test/ └── java/ # 测试代码目录三、添加Flink依赖
3.1 配置pom.xml
在pom.xml文件中添加Flink核心依赖:
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu</groupId><artifactId>FlinkTutorial</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!-- Flink版本 --><flink.version>1.17.0</flink.version></properties><dependencies><!-- Flink流处理核心依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!-- Flink客户端依赖(用于本地执行和提交作业) --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency></dependencies></project>3.2 依赖说明
| 依赖 | 作用 | 是否必需 |
|---|---|---|
flink-streaming-java | DataStream API核心 | ✅ 必需 |
flink-clients | 本地执行和集群提交 | 可选(本地测试建议添加) |
Maven刷新:修改pom.xml后,点击IDEA右侧Maven面板的Reload按钮,或右键项目选择
Maven → Reload Project
四、数据准备
4.1 创建输入目录和文件
在项目根目录下创建input文件夹,并新建words.txt文件:
FlinkTutorial/ ├── input/ │ └── words.txt # 输入数据文件 ├── pom.xml └── src/ └── main/ └── java/ └── com/ └── atguigu/ └── wc/ # 后续创建的包4.2 输入数据内容
在words.txt中输入以下内容:
hello flink hello world hello java五、DataSet API实现批处理WordCount(了解即可)
5.1 代码实现
⚠️注意:DataSet API从Flink 1.12开始已标记为废弃,官方推荐使用DataStream API。本节仅为了解Flink历史演进。
创建包com.atguigu.wc,新建BatchWordCount.java:
packagecom.atguigu.wc;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.ExecutionEnvironment;importorg.apache.flink.api.java.operators.AggregateOperator;importorg.apache.flink.api.java.operators.DataSource;importorg.apache.flink.api.java.operators.FlatMapOperator;importorg.apache.flink.api.java.operators.UnsortedGrouping;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.util.Collector;/** * DataSet API实现批处理WordCount(已废弃,仅作了解) */publicclassBatchWordCount{publicstaticvoidmain(String[]args)throwsException{// 1. 创建批处理执行环境ExecutionEnvironmentenv=ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据(按行读取,每行是一个字符串)DataSource<String>lineDS=env.readTextFile("input/words.txt");// 3. 转换数据格式:每行拆分为单词,转换为 (word, 1) 二元组FlatMapOperator<String,Tuple2<String,Long>>wordAndOne=lineDS.flatMap(neworg.apache.flink.api.common.functions.FlatMapFunction<String,Tuple2<String,Long>>(){@OverridepublicvoidflatMap(Stringline,Collector<Tuple2<String,Long>>out)throwsException{// 按空格分割每行String[]words=line.split(" ");for(Stringword:words){// 输出 (word, 1)out.collect(Tuple2.of(word,1L));}}});// 4. 按照word进行分组(按二元组的第0个字段分组)UnsortedGrouping<Tuple2<String,Long>>wordAndOneUG=wordAndOne.groupBy(0);// 5. 分组内聚合统计(按二元组的第1个字段求和)AggregateOperator<Tuple2<String,Long>>sum=wordAndOneUG.sum(1);// 6. 打印结果sum.print();}}5.2 执行结果
(flink,1) (world,1) (hello,3) (java,1)六、DataStream API实现批处理WordCount(推荐方式)
6.1 流批统一设计理念
从Flink 1.12开始,官方推荐:直接使用DataStream API,通过设置执行模式为BATCH进行批处理。
6.2 代码实现
创建StreamWordCount.java:
packagecom.atguigu.wc;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;/** * DataStream API实现WordCount(流批统一) * 读取文件数据(有界流),默认以流模式处理 */publicclassStreamWordCount{publicstaticvoidmain(String[]args)throwsException{// 1. 创建流式执行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件(有界数据源)DataStreamSource<String>lineStream=env.readTextFile("input/words.txt");// 3. 转换、分组、求和,得到统计结果SingleOutputStreamOperator<Tuple2<String,Long>>sum=lineStream// 3.1 flatMap:将每行拆分为单词,转换为 (word, 1).flatMap(neworg.apache.flink.api.common.functions.FlatMapFunction<String,Tuple2<String,Long>>(){@OverridepublicvoidflatMap(Stringline,Collector<Tuple2<String,Long>>out)throwsException{String[]words=line.split(" ");for(Stringword:words){out.collect(Tuple2.of(word,1L));}}})// 3.2 返回类型提示(Java泛型擦除问题).returns(Types.TUPLE(Types.STRING,Types.LONG))// 3.3 keyBy:按单词分组(使用Lambda表达式).keyBy(data->data.f0)// 3.4 sum:按第1个字段(索引从0开始,1表示count)求和.sum(1);// 4. 打印结果sum.print();// 5. 执行(DataStream API必须显式调用execute)env.execute();}}6.3 执行结果
3> (java,1) 5> (hello,1) 5> (hello,2) 5> (hello,3) 13> (flink,1) 9> (world,1)结果说明:前面的数字(如
3>、5>)表示输出该结果的并行子任务编号,体现了Flink的并行处理特性。
6.4 与DataSet API的关键差异
| 对比项 | DataSet API(已废弃) | DataStream API(推荐) |
|---|---|---|
| 执行环境 | ExecutionEnvironment | StreamExecutionEnvironment |
| 分组操作 | groupBy(0) | keyBy(data -> data.f0) |
| 返回类型 | 自动推断 | 需显式指定returns() |
| 触发执行 | 懒执行,print()即触发 | 必须调用execute() |
| 并行度 | 默认全局 | 可针对算子设置 |
七、Lambda表达式简化版本
使用Lambda表达式简化flatMap:
packagecom.atguigu.wc;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.util.Collector;importjava.util.Arrays;/** * Lambda表达式简化版WordCount */publicclassLambdaWordCount{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String>lineStream=env.readTextFile("input/words.txt");SingleOutputStreamOperator<Tuple2<String,Long>>sum=lineStream// Lambda表达式实现flatMap.flatMap((Stringline,Collector<Tuple2<String,Long>>out)->{String[]words=line.split(" ");for(Stringword:words){out.collect(Tuple2.of(word,1L));}})// Lambda表达式必须显式声明返回类型!.returns(Types.TUPLE(Types.STRING,Types.LONG)).keyBy(data->data.f0).sum(1);sum.print();env.execute();}}八、批处理模式设置
8.1 代码中设置
importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.api.common.RuntimeExecutionMode;publicclassBatchModeWordCount{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();// 设置为批处理模式env.setRuntimeMode(RuntimeExecutionMode.BATCH);// 后续处理与流模式完全相同...}}8.2 命令行设置
# 提交作业时指定批处理模式$ bin/flink run -Dexecution.runtime-mode=BATCH FlinkTutorial-1.0-SNAPSHOT.jar8.3 三种执行模式对比
| 模式 | 适用场景 | 特点 |
|---|---|---|
STREAMING | 无界数据流(Kafka、Socket等) | 默认模式,持续处理 |
BATCH | 有界数据流(文件、集合等) | 数据读完即结束 |
AUTOMATIC | 不确定数据源类型 | 自动根据数据源是否有界选择 |
九、项目打包与提交
9.1 添加打包插件
在pom.xml中添加Maven Shade插件,打包可执行jar:
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformerscombine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build>9.2 打包命令
# 在IDEA中:右侧Maven面板 → Lifecycle → package# 或在终端执行:$ mvn clean package打包完成后,在target目录下生成两个jar:
FlinkTutorial-1.0-SNAPSHOT.jar—— 不包含依赖(推荐,集群已具备依赖)FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar—— 包含所有依赖
9.3 提交到集群
# 启动Flink本地集群$ bin/start-cluster.sh# 提交作业$ bin/flink run-ccom.atguigu.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar# 查看Web UI(默认8081端口)# http://localhost:8081十、常见问题总结
Q1:为什么DataStream API需要调用execute()?
Flink采用**延迟执行(Lazy Execution)**策略:
main()方法中只是定义了作业的执行操作,添加到数据流图中- 此时并没有真正处理数据(数据可能还没来)
execute()触发真正的作业执行,并等待作业完成
Q2:Lambda表达式不加returns()会怎样?
错误信息示例: Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(LambdaWordCount.java:24)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method.Q3:为什么输出结果前面有数字?
数字表示并行子任务的编号(如5>),说明该结果由第5个并行子任务输出。Flink默认并行度为CPU核心数,可以通过以下方式设置:
// 全局设置并行度env.setParallelism(2);// 针对某个算子设置并行度sum.print().setParallelism(1);// 输出并行度设为1,结果有序十一、总结
11.1 本文核心要点
| 要点 | 内容 |
|---|---|
| 项目搭建 | Maven项目 + Flink 1.17依赖(flink-streaming-java + flink-clients) |
| DataSet API | 已废弃,了解即可,不推荐新项目使用 |
| DataStream API | 官方推荐,流批统一,一套API处理两种场景 |
| Lambda简化 | 必须配合returns()解决泛型擦除问题 |
| 延迟执行 | 必须调用env.execute()触发作业执行 |
| 打包提交 | Maven Shade插件打包,通过flink run提交 |
11.2 流批一体的价值
价值:
✅ 降低学习成本:只需掌握一套API
✅ 降低开发成本:相同逻辑无需写两份代码
✅ 降低维护成本:统一升级、统一优化
✅ 保证结果一致:相同逻辑,流和批结果相同
如果本文对你有帮助,欢迎点赞、收藏、关注!有任何问题欢迎在评论区留言讨论。
专栏持续更新中,关注不迷路~ 🚀