Spark 旁路导入 OceanBase 报错:DirectLoadWriteBuilderV2 does not support batch write

【 使用环境 】生产环境
【 OB or 其他组件 】OceanBase、Spark 3.1.3 、spark-connector-oceanbase-3.4_2.12 v1.3
【 使用版本 】OceanBase 企业版 4.2.1.11
【问题描述】从Hive采用旁路导入写入OceanBase集群中(OCP端口),写入报错com.oceanbase.spark.writer.v2.DirectLoadWirteBuilderV2 does not support batch write
【复现路径】从 Hive写入数据到 OceanBase中

SparkSession spark = SparkSession.builder()
        .appName("Hive to OceanBase Direct Load")
        .config("spark.sql.catalog.oceanbase", "com.oceanbase.spark.catalog.OceanBaseCatalog")
        .config("spark.sql.catalog.oceanbase.url", "<JDBC_URL>")
        .config("spark.sql.catalog.oceanbase.username", "<USER>")
        .config("spark.sql.catalog.oceanbase.password", "<PASS>")
        .config("spark.sql.catalog.oceanbase.schema-name", "<SCHEMA>")
        .config("spark.sql.catalog.oceanbase.driver", "com.mysql.cj.jdbc.Driver")

        // 启用 OceanBase 旁路导入(Direct Load)
        .config("spark.sql.catalog.oceanbase.direct-load.enabled", "true")
        .config("spark.sql.catalog.oceanbase.direct-load.host", "<OBSERVER_IP>")
        .config("spark.sql.catalog.oceanbase.direct-load.rpc-port", "<RPC_PORT>")

        .config("spark.sql.adaptive.enabled", "true")
        .enableHiveSupport()
        .getOrCreate();

try {

    // 2.1 创建目标临时表
    String createDDL = "CREATE TABLE IF NOT EXISTS <SCHEMA>.<TMP_TABLE> (\n" +
            "    id VARCHAR(32) NOT NULL,\n" +
            "    attr1 VARCHAR(100),\n" +
            "    attr2 VARCHAR(100),\n" +
            "    attr3 VARCHAR(100),\n" +
            "    attr4 VARCHAR(10),\n" +
            "    attr5 VARCHAR(20),\n" +
            "    attr6 VARCHAR(200),\n" +
            "    start_date VARCHAR(10),\n" +
            "    end_date VARCHAR(10)\n" 
            ");";
    executeDDL(createDDL);

    // 2.2 执行数据写入(通过 Spark SQL 触发 Direct Load)
    String insertSql = String.format(
            "INSERT INTO oceanbase.<SCHEMA>.<TMP_TABLE> SELECT * FROM <HIVE_DB>.<SRC_TABLE>"
    );

    System.out.println("正在通过 Direct Load 写入数据...");
    long t1 = System.currentTimeMillis();
    spark.sql(insertSql);
    long writeTime = System.currentTimeMillis() - t1;
    System.out.printf("写入完成,耗时:%d ms%n", writeTime);
}

【备注】基于 LLM 和开源文档 RAG 的论坛小助手已开放测试,在发帖时输入 [@论坛小助手] 即可召唤小助手,欢迎试用!

您好,您提的这个技术问题可能牵涉到 OceanBase 企业版范围内的功能细节;针对此类问题,建议你通过以下方式寻求帮助:

  1. 如你所在的企业客户已签署 OceanBase 企业版销售合同,请你联系客户经理;

  2. 如你所在的企业客户尚未签署 OceanBase 企业版销售合同,你可通过OceanBase官网商务咨询页面留下你的联系方式,OceanBase企业版的业务顾问会在一个工作日内与你联系。

另外,我们欢迎你使用社区版,并在论坛/社群中分享你对社区版本的想法、经验和问题,与其他社区成员共同交流。

(小助手的答复已结束,如未能解决您的问题,请继续提问并等待其他同学的回复,谢谢!)

1 个赞