Flink cdc + oblogproxy + ob oracle模式解析表结构报错

【 使用环境 】生产环境
【 OB or 其他组件 】flink-sql-connector-oceanbase-cdc-3.2.0.jar
【 使用版本 】企业版本 4.+
【问题描述】flink cdc通过代理同步数据的时候。报错:
Caused by: io.debezium.DebeziumException: Failed to set field default value for ‘TRADEDBADM.RPMTINVD.MERC_NTC_CMBL’ of type NUMBER, the default value is 0 of type class java.lang.String
核实发现建表语句中该字段定义:
“MERC_NTC_CMBL” NUMBER(12) DEFAULT 0 NOT NULL ENABLE
我们同步其他表,类型同样为NUMBER(12) 不存在问题,怀疑是启动flink任务时,会去解析ob库表结构,然后对于默认字段的类型解析存在问题。怀疑理由是我们删除该字段的同步,一样会报上述错误

【复现路径】
1、对比相同类型字段的同步,发现字段定义不存在DEFAULT 0 都同步正常
2、删除MERC_NTC_CMBL问题字段,一样报错

【附件及日志】下方为flink 任务的详细错误日志
2025-06-24 15:29:26

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)

at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)

at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)

at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)

at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)

at jdk.internal.reflect.GeneratedMethodAccessor92.invoke(Unknown Source)

at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)

at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)

at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)

at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)

at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)

at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)

at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)

at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)

at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)

at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)

at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)

at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)

at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)

at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)

at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)

at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)

at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)

at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)

at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)

at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)

at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)

at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)

at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)

at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)

at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

Caused by: io.debezium.DebeziumException: Failed to set field default value for ‘TRADEDBADM.RPMTINVD.MERC_NTC_CMBL’ of type NUMBER, the default value is 0 of type class java.lang.String

at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421)

at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)

at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)

at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)

at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)

at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)

at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)

at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)

at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)

at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)

at io.debezium.relational.TableSchemaBuilder.create(TableSchemaBuilder.java:147)

at io.debezium.relational.RelationalDatabaseSchema.buildAndRegisterSchema(RelationalDatabaseSchema.java:135)

at io.debezium.relational.RelationalDatabaseSchema.refreshSchema(RelationalDatabaseSchema.java:209)

at io.debezium.relational.RelationalDatabaseSchema.refresh(RelationalDatabaseSchema.java:200)

at org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.getTableSchema(OceanBaseRichSourceFunction.java:305)

at org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.lambda$readSnapshotRecordsByTable$2(OceanBaseRichSourceFunction.java:334)

at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:555)

at io.debezium.jdbc.JdbcConnection.query(JdbcConnection.java:496)

at org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecordsByTable(OceanBaseRichSourceFunction.java:331)

at java.base/java.lang.Iterable.forEach(Iterable.java:75)

at org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecords(OceanBaseRichSourceFunction.java:310)

at org.apache.flink.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:190)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:114)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:71)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:338)

Caused by: org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.SchemaBuilderException: Invalid default value

at org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:131)

at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:415)

... 25 more

Caused by: org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: Invalid Java object for schema with type INT64: class java.lang.String

at org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:242)

at org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.ConnectSchema.validateValue(ConnectSchema.java:213)

at org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.SchemaBuilder.defaultValue(SchemaBuilder.java:129)

... 26 more

你好,你提的这个技术问题牵涉到OceanBase企业版范围内的功能细节;针对此类问题,建议你通过以下方式寻求帮助:

  1. 如你所在的企业客户已签署OceanBase企业版销售合同,请你联系客户经理;

  2. 如你所在的企业客户尚未签署OceanBase企业版销售合同,你可通过OceanBase官网商务咨询页面留下你的联系方式,OceanBase企业版的业务顾问会在一个工作日内与你联系。

另外,我们欢迎你使用社区版,并在论坛/社群中分享你对社区版本的想法、经验和问题,与其他社区成员共同交流。

我看了下,现在社区版的 OB CDC 没依赖 Debezium 开发,这里只是在初始化阶段复用了一下 Debezium 的 RelationalDatabaseSchema 结构,和 Debezium 的 bug 应该无关。

看起来可能是 OB 返回的默认值和标准的 JDBC 格式有区别,例如在数值为 0 的时候加上了引号变成了 ‘0’,导致解析出错。

因为 OceanBase CDC 也不支持获取 DDL 事件,这里的默认值其实没用。直接去掉就可以了。

如果用户可以自己打包的话这样补丁一下就可以:

Subject: [PATCH] Omit default value in OceanBase CDC
---
Index: flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java	(revision 35504bddaa531ea7d9893bb73b8f410be9f5ea54)
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/main/java/org/apache/flink/cdc/connectors/oceanbase/source/connection/OceanBaseConnection.java	(date 1755141417135)
@@ -259,7 +259,9 @@
                                     column -> {
                                         columnsByTable
                                                 .computeIfAbsent(tableId, t -> new ArrayList<>())
-                                                .add(column.create());
+                                                .add(column
+                                                        .unsetDefaultValueExpression()
+                                                        .create());
                                     });
                 }
             }