【 使用环境 】生产环境
【 OB or 其他组件 】OceanBase、Spark 3.1.3 、spark-connector-oceanbase-3.4_2.12 v1.3
【 使用版本 】OceanBase 企业版 4.2.1.11
【问题描述】从Hive采用旁路导入写入OceanBase集群中(OCP端口),写入报错com.oceanbase.spark.writer.v2.DirectLoadWirteBuilderV2 does not support batch write
【复现路径】从 Hive写入数据到 OceanBase中
SparkSession spark = SparkSession.builder()
.appName("Hive to OceanBase Direct Load")
.config("spark.sql.catalog.oceanbase", "com.oceanbase.spark.catalog.OceanBaseCatalog")
.config("spark.sql.catalog.oceanbase.url", "<JDBC_URL>")
.config("spark.sql.catalog.oceanbase.username", "<USER>")
.config("spark.sql.catalog.oceanbase.password", "<PASS>")
.config("spark.sql.catalog.oceanbase.schema-name", "<SCHEMA>")
.config("spark.sql.catalog.oceanbase.driver", "com.mysql.cj.jdbc.Driver")
// 启用 OceanBase 旁路导入(Direct Load)
.config("spark.sql.catalog.oceanbase.direct-load.enabled", "true")
.config("spark.sql.catalog.oceanbase.direct-load.host", "<OBSERVER_IP>")
.config("spark.sql.catalog.oceanbase.direct-load.rpc-port", "<RPC_PORT>")
.config("spark.sql.adaptive.enabled", "true")
.enableHiveSupport()
.getOrCreate();
try {
// 2.1 创建目标临时表
String createDDL = "CREATE TABLE IF NOT EXISTS <SCHEMA>.<TMP_TABLE> (\n" +
" id VARCHAR(32) NOT NULL,\n" +
" attr1 VARCHAR(100),\n" +
" attr2 VARCHAR(100),\n" +
" attr3 VARCHAR(100),\n" +
" attr4 VARCHAR(10),\n" +
" attr5 VARCHAR(20),\n" +
" attr6 VARCHAR(200),\n" +
" start_date VARCHAR(10),\n" +
" end_date VARCHAR(10)\n"
");";
executeDDL(createDDL);
// 2.2 执行数据写入(通过 Spark SQL 触发 Direct Load)
String insertSql = String.format(
"INSERT INTO oceanbase.<SCHEMA>.<TMP_TABLE> SELECT * FROM <HIVE_DB>.<SRC_TABLE>"
);
System.out.println("正在通过 Direct Load 写入数据...");
long t1 = System.currentTimeMillis();
spark.sql(insertSql);
long writeTime = System.currentTimeMillis() - t1;
System.out.printf("写入完成,耗时:%d ms%n", writeTime);
}
【备注】基于 LLM 和开源文档 RAG 的论坛小助手已开放测试,在发帖时输入 [@论坛小助手] 即可召唤小助手,欢迎试用!