JoyCon-Driver终极指南:在Windows上完美使用Switch控制器
2026/6/7 18:38:14
来源:反编译自
starrocks-cluster-sync-2.0-jar-with-dependencies.jar
反编译工具:CFR 0.152
主类:com.starrocks.sync.SyncJob
Starrocks-跨集群数据迁移工具
StarRocks 跨集群数据迁移工具采用多线程协调架构,通过周期性元数据同步和任务队列管理实现自动化数据迁移。
┌─────────────────────────────────────────────────────────────┐ │ 迁移工具(客户端) │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │ │ meta-handler │ │ ddl-handler │ │replication- │ │ │ │ │ │ │ │job-handler │ │ │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ │ │ │ │ └─────────────────┼─────────────────┘ │ │ │ │ │ ┌──────▼───────┐ │ │ │sync-reporter │ │ │ └──────────────┘ │ └───────────────────────────┬─────────────────────────────────┘ │ ┌───────────────────┼───────────────────┐ │ │ │ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ 源集群 FE │ │ 目标集群 FE │ │ 目标集群 BE │ │ (元数据查询)│ │ (RPC 调用) │ │ (数据接收) │ └──────┬───────┘ └──────────────┘ └──────┬───────┘ │ │ └────────────────────────────────────────┘ │ HTTP 快照传输 ▼ ┌──────────────┐ │ 源集群 BE │ │ (数据提供) │ └──────────────┘SyncJob(){1.读取配置文件(sync.properties,hosts.properties)2.验证必要配置(source_cluster_token 等)3.初始化任务队列4.创建ClusterMetaKeeper(元数据管理器)}start(){启动4个工作线程:1.meta-handler(元数据处理线程)-周期性更新源集群和目标集群的元数据-生成 DDL 任务-生成数据复制任务2.ddl-handler(DDL 执行线程)-从 DDL 队列取出任务-批量执行 DDL(CREATE/DROP TABLE/PARTITION 等)3.replication-job-handler(数据复制线程)-从复制任务队列取出任务-通过 RPC 发送到目标集群 FE-更新任务状态4.sync-reporter(进度报告线程)-查询事务状态-统计任务进度-输出日志报告}循环执行(直到手动停止): ├─ meta-handler: 更新元数据 → 生成任务 ├─ ddl-handler: 执行 DDL 任务 ├─ replication-job-handler: 发送复制任务 └─ sync-reporter: 报告进度updateClusterMeta(){并行执行:1.同步源集群元数据-FE 节点信息-BE 节点信息-数据库、表、分区、索引信息-版本信息2.同步目标集群元数据-FE 节点信息-BE 节点信息-数据库、表、分区、索引信息-版本信息-事务状态(运行中/已完成)}produceDDL(){比较源集群和目标集群的元数据差异:1.handleDbDDL()-源集群有,目标集群没有 → 创建数据库-目标集群有,源集群没有 → 删除数据库(可选)2.handleTableDDL()-源集群有,目标集群没有 → 创建表-目标集群有,源集群没有 → 删除表(可选)-表结构不一致 → 删除并重建(可选)3.handlePartitionDDL()-源集群有,目标集群没有 → 添加分区-目标集群有,源集群没有 → 删除分区(可选)-版本不一致 → 删除并重建(可选)4.handleMaterializedViewDDL()-同步物化视图结构(不同步数据)5.handleViewDDL()-同步视图定义}produceReplicationJob(){遍历公共数据库和表:for(数据库 in 公共数据库){for(表 in 公共表){for(分区 in 公共分区){1.比较分区版本-源版本>目标版本 → 需要同步-源版本=目标版本 → 检查版本时间(存算分离)-源版本<目标版本 → 删除目标分区2.收集需要同步的分区信息-分区 ID-源版本号-索引信息-Tablet映射关系-源 BE 节点信息3.检查数据量限制-如果超过 max_replication_data_size_per_job_in_gb-停止添加分区,标记为部分复制}4.创建ReplicationJob-包含所有需要同步的分区-添加到复制任务队列}}}DDL 任务队列 ↓ 批量取出(ddlJobBatchSize) ↓ 执行 SQL(CREATE/DROP/ALTER) ↓ 更新元数据复制任务队列 ↓ 批量取出(replicationJobBatchSize) ↓ 转换为 Thrift 请求 ↓ RPC 调用目标集群 FE ↓ 目标 FE 创建事务并协调 BE ↓ 目标 BE 从源 BE 拉取数据 ↓ 查询事务状态跟踪进度JobState{UNKNOWN,// 未知状态INIT,// 已发送到目标集群SENT,// 已发送但未开始运行SENT_FAILED,// 发送失败RUNNING,// 正在运行FAILED,// 失败FINISHED// 完成}getTxnStatus(jobToken){1.解析 jobToken →{dbId}-{tableId}_{jobId}2.查询运行中的事务 SHOW PROC'/transactions/{dbId}/running'-如果找到 → RUNNING3.查询已完成的事务 SHOW PROC'/transactions/{dbId}/finished'-如果找到且状态为 VISIBLE → FINISHED-如果找到且状态为 ABORTED → FAILED4.都找不到 → UNKNOWN}核心结论:数据传输采用BE 到 BE 的直接快照复制机制,通过HTTP 协议传输数据。
不是 JDBC:JDBC 仅用于元数据查询(SHOW PROC、SHOW DATA 等)
不是 StreamLoad:StreamLoad 用于外部数据导入,不是集群间复制
┌─────────────────────────────────────────────────────────────┐ │ 迁移工具 │ │ 生成 ReplicationJob(包含源 BE 信息) │ └──────────────────────┬──────────────────────────────────────┘ │ Thrift RPC ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 FE │ │ 1. 接收 startTableReplication 请求 │ │ 2. 验证 Token 和权限 │ │ 3. 创建事务(LoadJobSourceType = 'REPLICATION') │ │ 4. 将任务分发给目标 BE 节点 │ └──────────────────────┬──────────────────────────────────────┘ │ 任务分发 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 BE │ │ 1. 接收复制任务 │ │ 2. 解析源 BE 信息(host, httpPort) │ │ 3. 通过 HTTP 协议连接源 BE │ │ 4. 请求快照数据(包含 srcTabletId, version, schemaHash) │ │ 5. 接收快照数据并写入本地存储 │ │ 6. 报告写入完成 │ └──────────────────────┬──────────────────────────────────────┘ │ HTTP 协议(快照下载) ▼ ┌─────────────────────────────────────────────────────────────┐ │ 源集群 BE │ │ 1. 接收快照下载请求 │ │ 2. 验证 Token(从请求中获取) │ │ 3. 根据版本号生成快照 │ │ 4. 通过 HTTP 协议传输快照数据 │ └─────────────────────────────────────────────────────────────┘// 在 produceReplicationJob() 中ReplicationJobreplicationJob=newReplicationJob(jobId,// 任务 IDtargetUserName,// 目标集群用户名targetPassword,// 目标集群密码sourceToken,// 源集群 Token(关键!)targetDbId,// 目标数据库 IDtargetTableId,// 目标表 IDdbName,// 数据库名tableName,// 表名srcTableType,// 源表类型srcTableDataSize,// 源表数据大小partitionInfos// 分区信息列表);// 分区信息包含:PartitionInfo{partitionId,// 目标分区 IDsrcVersion,// 源分区版本(关键!)indexInfos// 索引信息列表}// 索引信息包含:IndexInfo{indexId,// 目标索引 IDsrcSchemaHash,// 源 Schema HashtabletInfos// Tablet 信息列表}// Tablet 信息包含:TabletInfo{tabletId,// 目标 Tablet IDsrcTabletId,// 源 Tablet ID(关键!)replicaInfos// 副本信息列表}// 副本信息包含:ReplicaInfo{srcBackend{// 源 BE 节点信息(关键!)host,// 源 BE 主机地址bePort,// BE 服务端口httpPort// HTTP 端口(用于数据传输)}}// 在 Utils.sendReplicationJob() 中1.随机选择目标集群 FE 节点2.将ReplicationJob转换为TTableReplicationRequest3.通过ThriftRPC 调用:FrontendServiceProxy.call(address,// 目标 FE 地址10000,// 超时时间(10秒)3,// 重试次数client->client.startTableReplication(request))// 目标集群 FE 接收到请求后(FE 端逻辑,不在工具代码中)1.验证请求-检查用户名和密码-验证源集群Token-检查并发限制(replication_max_parallel_table_count 等)2.创建事务-事务Label:{dbId}-{tableId}_{jobId}-LoadJobSourceType:'REPLICATION'-事务状态:PREPARE3.分发任务到目标 BE 节点-根据Tablet分布信息-将任务发送到对应的 BE 节点// 目标 BE 节点接收到任务后(BE 端逻辑,不在工具代码中)1.解析任务信息-获取源 BE 信息(host,httpPort)-获取源TabletID-获取版本号和SchemaHash2.建立 HTTP 连接-连接到源 BE 的 httpPort(默认8040)-使用源集群Token进行认证3.请求快照数据 HTTPRequest:-URL:http://{srcBeHost}:{httpPort}/api/snapshot/download-Method:POST-Headers:*Authorization:Token{sourceToken}-Body:{"tablet_id":srcTabletId,"version":srcVersion,"schema_hash":srcSchemaHash,"target_tablet_id":tabletId}4.源 BE 处理请求-验证Token-根据版本号生成快照-如果快照为空,返回错误:"Source snapshots is empty"-否则,通过 HTTP 流式传输快照数据5.目标 BE 接收数据-接收快照文件-验证数据完整性(Checksum)-写入本地存储-更新Tablet元数据// 所有副本写入完成后1.目标 BE 报告写入完成2.FE 提交事务-事务状态:COMMITTED3.FE 发布数据-事务状态:PUBLISHED-数据变为可见(VISIBLE)4.FE 完成事务-事务状态:FINISHED// 迁移工具定期查询事务状态1.查询运行中的事务 SHOW PROC'/transactions/{dbId}/running'-如果找到对应的事务 → RUNNING2.查询已完成的事务 SHOW PROC'/transactions/{dbId}/finished'-如果找到且状态为 VISIBLE → FINISHED-如果找到且状态为 ABORTED → FAILED3.更新任务状态-更新 replicationJobState-更新 replicationTableStatus-统计进度JDBC 的特点:
为什么不适合:
性能问题:
功能限制:
资源消耗:
代码证据:
// 工具中 JDBC 仅用于元数据查询Utils.execQuerySql("SHOW PROC '/dbs/'",...);Utils.execQuerySql("SHOW DATA",...);Utils.execQuerySql("SHOW PARTITIONS",...);// 没有用于数据传输的 JDBC 调用StreamLoad 的特点:
为什么不适合:
数据来源:
数据格式:
一致性:
代码证据:
// 请求中包含的是元数据信息,不是数据本身TTableReplicationRequest{src_tablet_id,// 源 Tablet IDsrc_version,// 源版本号src_backend,// 源 BE 信息// 没有数据内容}优势分析:
性能优势:
✅ 直接传输二进制数据,无需格式转换 ✅ BE 到 BE 直连,减少网络跳数 ✅ 支持并行传输,提高吞吐量 ✅ 基于版本快照,支持增量同步一致性保证:
✅ 基于版本号,保证数据版本一致性 ✅ 快照是原子操作,要么全部成功要么全部失败 ✅ 支持事务机制,保证数据完整性资源效率:
✅ 不占用 FE 查询资源 ✅ 不执行查询计划,减少 CPU 消耗 ✅ 直接传输存储格式,减少内存消耗灵活性:
✅ 支持指定版本号复制特定版本的数据 ✅ 支持增量同步(只传输版本差异) ✅ 支持断点续传(快照机制)安全性:
✅ 使用 Token 进行认证 ✅ BE 到 BE 直连,减少中间环节 ✅ 支持网络地址映射,适应复杂网络环境代码证据:
// 1. 请求中包含源 BE 信息ReplicationJob.BackendInfo{host,// 源 BE 主机地址bePort,// BE 服务端口httpPort// HTTP 端口(用于数据传输)}// 2. 请求中包含版本信息PartitionInfo{srcVersion// 源分区版本号}// 3. 错误信息显示使用快照机制if(errorMsg.contains("Source snapshots is empty")){// 说明使用快照机制}// 版本比较逻辑if(srcPartition.getVisibleVersion()>targetPartition.getVisibleVersion()){// 需要同步:目标 BE 请求指定版本号的快照request.version=srcPartition.getVisibleVersion();}作用:
// 快照请求{"tablet_id":srcTabletId,// 源 Tablet ID"version":srcVersion,// 版本号"schema_hash":srcSchemaHash,// Schema Hash"target_tablet_id":tabletId// 目标 Tablet ID}特点:
// 请求中包含源集群 TokenTTableReplicationRequest{src_token:"wwwwwwww-xxxx-yyyy-zzzz-uuuuuuuuuu"}// 目标 BE 使用 Token 访问源 BEHTTPRequestHeaders:Authorization:Token{src_token}作用:
// FE 配置参数replication_max_parallel_table_count// 最大并发表数replication_max_parallel_replica_count// 最大并发副本数replication_max_parallel_data_size_mb// 最大并发数据量作用:
┌─────────────────────────────────────────────────────────────┐ │ 迁移工具 │ │ 1. 生成 ReplicationJob │ │ - 包含源 BE 信息(host, httpPort) │ │ - 包含 Tablet 映射(srcTabletId → tabletId) │ │ - 包含版本信息(srcVersion) │ │ - 包含 Token(srcToken) │ └──────────────────────┬──────────────────────────────────────┘ │ Thrift RPC ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 FE │ │ 1. 验证请求(Token、权限) │ │ 2. 检查并发限制 │ │ 3. 创建事务(Label: {dbId}-{tableId}_{jobId}) │ │ 4. 分发任务到目标 BE 节点 │ └──────────────────────┬──────────────────────────────────────┘ │ 任务分发 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 BE │ │ 1. 接收任务 │ │ 2. 解析源 BE 信息 │ │ 3. 建立 HTTP 连接 │ │ 4. 发送快照下载请求 │ │ POST http://{srcBeHost}:{httpPort}/api/snapshot/download │ │ Headers: Authorization: Token {srcToken} │ │ Body: {tablet_id, version, schema_hash, target_tablet_id}│ └──────────────────────┬──────────────────────────────────────┘ │ HTTP 协议 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 源集群 BE │ │ 1. 验证 Token │ │ 2. 根据版本号生成快照 │ │ 3. 通过 HTTP 流式传输快照数据 │ │ 4. 返回数据(或错误:"Source snapshots is empty") │ └──────────────────────┬──────────────────────────────────────┘ │ HTTP Response(快照数据) ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 BE │ │ 1. 接收快照数据 │ │ 2. 验证数据完整性(Checksum) │ │ 3. 写入本地存储 │ │ 4. 更新 Tablet 元数据 │ │ 5. 报告写入完成 │ └──────────────────────┬──────────────────────────────────────┘ │ 完成报告 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 目标集群 FE │ │ 1. 接收所有副本的完成报告 │ │ 2. 提交事务(COMMITTED) │ │ 3. 发布数据(PUBLISHED → VISIBLE) │ │ 4. 完成事务(FINISHED) │ └──────────────────────┬──────────────────────────────────────┘ │ 状态查询 ▼ ┌─────────────────────────────────────────────────────────────┐ │ 迁移工具 │ │ 1. 查询事务状态 │ │ SHOW PROC '/transactions/{dbId}/running' │ │ SHOW PROC '/transactions/{dbId}/finished' │ │ 2. 更新任务状态 │ │ - RUNNING / FINISHED / FAILED │ │ 3. 统计进度 │ └─────────────────────────────────────────────────────────────┘replicationJobBatchSize控制| 决策 | 原因 |
|---|---|
| BE 到 BE 直连 | 减少网络跳数,提高性能 |
| HTTP 协议 | 成熟稳定,支持大文件传输 |
| 快照机制 | 保证数据一致性,支持增量同步 |
| Token 认证 | 保证安全性,简化认证流程 |
| 版本控制 | 支持增量同步,避免重复传输 |
文档生成时间:2025-01-XX
反编译工具:CFR 0.152
JAR 版本:starrocks-cluster-sync-2.0