使用flink oceanbase cdc遇到的几个问题咨询

【 测试环境 】
【 OB 】
【 4.2.0 】
【问题描述】
1.flink任务频繁重启:java.util.concurrent.TimeoutException: Timeout to receive messages in RecordListener
at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readChangeRecords(OceanBaseRichSourceFunction.java:374)
at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:158)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

2.抛异常:
java.lang.RuntimeException: Chunk reader exception
at com.ververica.cdc.connectors.oceanbase.source.OceanBaseSnapshotChunkSplitter.checkException(OceanBaseSnapshotChunkSplitter.java:162)
at com.ververica.cdc.connectors.oceanbase.source.OceanBaseSnapshotChunkSplitter.split(OceanBaseSnapshotChunkSplitter.java:121)
at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readSnapshotRecords(OceanBaseRichSourceFunction.java:335)
at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:225)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1388)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.flushResolvedChunkCache(OceanBaseRichSourceFunction.java:380)
at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.lambda$getCheckpointListener$2(OceanBaseRichSourceFunction.java:363)
at com.ververica.cdc.connectors.oceanbase.source.OceanBaseSnapshotChunkSplitter.lambda$new$0(OceanBaseSnapshotChunkSplitter.java:77)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

第1个问题可以调大 connect.timeout 试试,第2个问题应该是一个bug,我新推了一个修复 commit,可以拉下来试一下 GitHub - whhe/flink-cdc-connectors at oceanbase

调大timeout的话,每次在这个时间内无新增数据的话也是仍然会报错的。这个有办法不报错吗

这个超时不是说明没有增量数据,而是没有连接成功,因为链路建立成功的时候会有 HEARTBEAT 类型的消息不间断地发给客户端的,如果始终会超时的话,应该是客户端或者 oblogproxy 的配置不太对,要看下日志排查下。

好的,另外binlog模式和cdc模式 推荐使用哪一个呀 哪个性能会更好