flinkc cdc 从oceabase 同步到mysql,oblogproxy 报Failed to query observer:Table 'oceanbase.__all_virtual_server_clog_stat

【 使用环境 】生产环境
【 OB or 其他组件 】oblogproxy
【 使用版本 】
oblogproxy-ce-for-4x-1.1.3-20230804144645
OceanBase_CE 4.1.0.0 (r100000202023040520-0765e69043c31bf86e83b5d618db0530cf31b707)
flink-1.15.3-bin-scala_2.12
flink-sql-connector-oceanbase-cdc-2.2.0.jar
flink-connector-jdbc-1.15.3.jar
mysql-connector-java-5.1.47.jar

【问题描述】flinkc cdc 从oceabase 同步到mysql,oblogproxy 报Failed to query observer:Table 'oceanbase.__all_virtual_server_clog_stat,导致flink无法捕获oceabase数据增量

【复现路径】

1.解压 oblogproxy :

使用的是 官网的oblogproxy-ce-for-4x-1.1.3-20230804144645.tar.gz包

2.配置环境变量

/etc/profile配置
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/data/oblogproxy/liboblog

3. 生成加密用户和对应的密码

./bin/logproxy -x root@sys
./bin/logproxy -x密码

4.配置conf.json

5.Flink SQL中创建同步任务


CREATE TABLE ob_tbl1 (
col1 INT PRIMARY KEY,
col2 VARCHAR(20),
col3 INT)
WITH (‘connector’ = ‘oceanbase-cdc’,
‘scan.startup.mode’ = ‘initial’,
‘tenant-name’ = ‘tenant_oa’,
‘username’ = ‘oatest@tenant_oa’,
‘password’ = ‘succez3.14’,
‘database-name’ = ‘oatest’,
‘table-name’ = ‘tbl1’,
‘hostname’ = ‘.171’,
‘port’ = ‘2883’,
‘rootserver-list’ = '
.171:2882:2881’,
‘logproxy.host’ = ‘***.171’,
‘logproxy.port’ = ‘2983’);


【问题现象及影响】
在flink_sql中执行:
Flink SQL> select * from ob_tbl1;
./log/out.log 日志报错如下

accept_interval_us:500000,
allow_all_tenant:1,
auth_allow_sys_user:1,
auth_use_rs:0,
auth_user:1,
builtin_cluster_url_prefix:,
channel_type:plain,
check_quota_enable:0,
command_timeout_s:10,
communication_mode:server,
count_record:0,
counter_interval_s:2,
debug:0,
encode_queue_size:20000,
encode_threadpool_size:8,
liboblog_tls:0,
liboblog_tls_cert_path:,
log_gc_interval_s:43200,
log_quota_day:7,
log_quota_size_mb:5120,
log_to_stdout:0,
max_cpu_ratio:0,
max_mem_quota_mb:0,
max_packet_bytes:67108864,
metric_enable:1,
metric_interval_s:10,
node_cpu_limit_threshold_percent:90,
node_disk_limit_threshold_percent:85,
node_mem_limit_minimum_mb:2048,
node_mem_limit_threshold_percent:85,
ob_clog_expr_s:43200,
ob_clog_fetch_interval_s:600,
ob_sys_password:,
ob_sys_username:
,
oblogreader_lease_s:300,
oblogreader_max_count:100,
oblogreader_path:./run,
oblogreader_path_retain_hour:168,
packet_magic:1,
process_name_address:140733387862965,
read_fail_interval_us:1000000,
read_timeout_us:2000000,
read_wait_num:20000,
readonly:0,
record_queue_size:20000,
send_fail_interval_us:1000000,
send_timeout_us:2000000,
service_port:2983,
tls_ca_cert_file:,
tls_cert_file:,
tls_key_file:,
tls_verify_peer:1,
verbose:0,
verbose_packet:0,
E20231025 18:25:36.463414 418366 mysql_protocol.cpp:239] Failed to query observer:Table ‘oceanbase.__all_virtual_server_clog_stat’ doesn’t exist, unexpected column count: 0
E20231025 18:25:36.463585 418366 clog_meta_routine.cpp:45] Failed to check the existence of svr_min_log_timestamp column in __all_virtual_server_clog_stat, disable clog check
E20231025 18:25:36.514278 422174 mysql_protocol.cpp:239] Failed to query observer:Table ‘oceanbase.__all_virtual_server_clog_stat’ doesn’t exist, unexpected column count: 0
E20231025 18:25:36.514469 422174 clog_meta_routine.cpp:45] Failed to check the existence of svr_min_log_timestamp column in __all_virtual_server_clog_stat, disable clog check

flink 报错:
tail -f /data/flink-1.15.3/log/flink-root-taskexecutor-0-node1.log

2023-10-25 19:16:54,460 WARN com.oceanbase.clogproxy.client.connection.ClientStream [] - start to reconnect…
2023-10-25 19:16:54,462 WARN com.oceanbase.clogproxy.client.connection.ClientStream [] - reconnect SUCC
2023-10-25 19:16:54,462 INFO com.oceanbase.clogproxy.client.connection.ClientHandler [] - ClientId: 192.168.10.171_190263_1698232614tenant_oa.oatest.tbl1: rootserver_list=.171:2882:2881, cluster_user=oatest@tenant_oa, cluster_password=***, tb_white_list=tenant_oa.oatest.tbl1, start_timestamp=0 connecting LogProxy: ***.171:2983
2023-10-25 19:16:54,967 INFO com.oceanbase.clogproxy.client.connection.ClientHandler [] - Connected to LogProxyServer, ip:127.0.0.1, version:f136f77be0d1ad50ff7053e5d1592c3d40696094
2023-10-25 19:16:57,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 1 at resolvedTimestamp: -1
2023-10-25 19:16:57,356 INFO org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - Coordinator connection received
2023-10-25 19:16:57,357 INFO org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - Invalid request. Received version = , offset = 0, while expected version = a59107ab-5eac-434e-9883-c07efece5bdc, offset = 0
2023-10-25 19:17:00,249 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 2 at resolvedTimestamp: -1
2023-10-25 19:17:03,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 3 at resolvedTimestamp: -1
2023-10-25 19:17:06,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 4 at resolvedTimestamp: -1
2023-10-25 19:17:09,249 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 5 at resolvedTimestamp: -1
2023-10-25 19:17:12,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 6 at resolvedTimestamp: -1
2023-10-25 19:17:15,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 7 at resolvedTimestamp: -1
2023-10-25 19:17:18,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 8 at resolvedTimestamp: -1
2023-10-25 19:17:21,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 9 at resolvedTimestamp: -1
2023-10-25 19:17:24,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 10 at resolvedTimestamp: -1
2023-10-25 19:17:27,249 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 11 at resolvedTimestamp: -1
2023-10-25 19:17:30,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 12 at resolvedTimestamp: -1
2023-10-25 19:17:33,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 13 at resolvedTimestamp: -1
2023-10-25 19:17:36,250 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 14 at resolvedTimestamp: -1
2023-10-25 19:17:39,249 INFO com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction [] - snapshotState checkpoint: 15 at resolvedTimestamp: -1

【附件】

flink cdc 的版本应该是2.3.0:
flink-sql-connector-oceanbase-cdc-2.3.0.jar
oceanbase.__all_virtual_server_clog_stat这个系统表是旧版本的系统表,因为jar版本较旧,所以查的也是旧的系统表

建议换成新的 flink cdc。

还有 logproxy 配置 sys 账密的时候,用户名不能带租户名,这里应该是 root。

clog 内部表的查询的报错,原因是 logproxy 的检查逻辑还没有适配新版 ob,不过它并不影响订阅数据,以后这部分逻辑可能会移除。

oceanbase 的 flink cdc 重启之后无法捕获到数据,换了flink和cdc的版本后可以正常同步,重启flink后又无法获取数据,这个要如何排查。

无法捕获数据,等待一段时间后会提示超时

目前的版本信息如下:

oblogproxy-ce-for-4x-1.1.3-20230804144645
OceanBase_CE 4.1.0.0 (r100000202023040520-0765e69043c31bf86e83b5d618db0530cf31b707)
flink-1.17.0
flink-sql-connector-oceanbase-cdc-2.4.0.jar

Flink SQL中创建同步任务

CREATE TABLE source_ob_tbl4 (
col1 INT PRIMARY KEY NOT ENFORCED ,
col2 VARCHAR(20),
col3 INT)
WITH (‘connector’ = ‘oceanbase-cdc’,
‘scan.startup.mode’ = ‘initial’,
‘tenant-name’ = ‘tenant**’,
‘username’ = ‘root@tenant**#obcluster’,
‘password’ = ‘**’,
‘table-list’ = ‘oa.tbl4’,
‘hostname’ = ‘192.168.10.155’,
‘jdbc.driver’ = ‘com.oceanbase.jdbc.Driver’,
‘port’ = ‘2883’,
‘rootserver-list’ = ‘192.168.10.155:2882:2881;192.168.10.157:2882:2881;192.168.10.158:2882:2881’,
‘logproxy.host’ = ‘192.168.10.155’,
‘logproxy.port’ = ‘2983’,
‘connect.timeout’=‘300000’,
‘working-mode’ = ‘memory’);

日志信息

oblogproxy 日志:


flink 日志:
2023-11-06 09:19:29,479 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Received JobGraph submission ‘collect’ (80b32cfc9550315fc526758d2f0889ee).

2023-11-06 09:19:29,479 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Submitting job ‘collect’ (80b32cfc9550315fc526758d2f0889ee).

2023-11-06 09:19:29,484 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_9 .

2023-11-06 09:19:29,485 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job ‘collect’ (80b32cfc9550315fc526758d2f0889ee).

2023-11-06 09:19:29,486 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy NoRestartBackoffTimeStrategy for collect (80b32cfc9550315fc526758d2f0889ee).

2023-11-06 09:19:29,489 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Created execution graph ada295705f8c7b03a59bac7ed0dbdf28 for job 80b32cfc9550315fc526758d2f0889ee.

2023-11-06 09:19:29,490 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Running initialization on master for job collect (80b32cfc9550315fc526758d2f0889ee).

2023-11-06 09:19:29,490 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Successfully ran initialization on master in 0 ms.

2023-11-06 09:19:29,491 INFO org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.

2023-11-06 09:19:29,492 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - No state backend has been configured, using default (HashMap) org.apache.flink.runtime.state.hashmap.HashMapStateBackend@27a182c9

2023-11-06 09:19:29,492 INFO org.apache.flink.runtime.state.StateBackendLoader [] - State backend loader loads the state backend as HashMapStateBackend

2023-11-06 09:19:29,492 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Checkpoint storage is set to ‘jobmanager’

2023-11-06 09:19:29,493 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No checkpoint found during restore.

2023-11-06 09:19:29,494 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@ab8314d for collect (80b32cfc9550315fc526758d2f0889ee).

2023-11-06 09:19:29,495 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting execution of job ‘collect’ (80b32cfc9550315fc526758d2f0889ee) under job master id 00000000000000000000000000000000.

2023-11-06 09:19:29,495 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]

2023-11-06 09:19:29,495 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect (80b32cfc9550315fc526758d2f0889ee) switched from state CREATED to RUNNING.

2023-11-06 09:19:29,496 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: source_ob_tbl4[1] → ConstraintEnforcer[2] → Sink: Collect table sink (1/1) (ada295705f8c7b03a59bac7ed0dbdf28_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from CREATED to SCHEDULED.

2023-11-06 09:19:29,497 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Connecting to ResourceManager akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)

2023-11-06 09:19:29,498 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Resolved ResourceManager address, beginning registration

2023-11-06 09:19:29,499 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_9 for job 80b32cfc9550315fc526758d2f0889ee.

2023-11-06 09:19:29,499 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_9 for job 80b32cfc9550315fc526758d2f0889ee.

2023-11-06 09:19:29,500 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.

2023-11-06 09:19:29,500 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job 80b32cfc9550315fc526758d2f0889ee: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]

2023-11-06 09:19:29,597 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: source_ob_tbl4[1] → ConstraintEnforcer[2] → Sink: Collect table sink (1/1) (ada295705f8c7b03a59bac7ed0dbdf28_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from SCHEDULED to DEPLOYING.

2023-11-06 09:19:29,598 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Deploying Source: source_ob_tbl4[1] → ConstraintEnforcer[2] → Sink: Collect table sink (1/1) (attempt #0) with attempt id ada295705f8c7b03a59bac7ed0dbdf28_cbc357ccb763df2852fee8c4fc7d55f2_0_0 and vertex id cbc357ccb763df2852fee8c4fc7d55f2_0 to localhost:6704-895fc4 @ localhost (dataPort=26708) with allocation id 37866fdec06ab581c002006d4adbe2e7

2023-11-06 09:19:29,834 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: source_ob_tbl4[1] → ConstraintEnforcer[2] → Sink: Collect table sink (1/1) (ada295705f8c7b03a59bac7ed0dbdf28_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from DEPLOYING to INITIALIZING.

2023-11-06 09:19:29,870 INFO org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator [] - Received sink socket server address: localhost/127.0.0.1:21863

2023-11-06 09:19:29,870 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: source_ob_tbl4[1] → ConstraintEnforcer[2] → Sink: Collect table sink (1/1) (ada295705f8c7b03a59bac7ed0dbdf28_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from INITIALIZING to RUNNING.

2023-11-06 09:19:29,931 INFO org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator [] - Sink connection established

2023-11-06 09:24:29,899 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: source_ob_tbl4[1] → ConstraintEnforcer[2] → Sink: Collect table sink (1/1) (ada295705f8c7b03a59bac7ed0dbdf28_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on localhost:6704-895fc4 @ localhost (dataPort=26708).

java.util.concurrent.TimeoutException: Timeout to receive messages in RecordListener

at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readChangeRecords(OceanBaseRichSourceFunction.java:374) ~[flink-sql-connector-oceanbase-cdc-2.4.0.jar:2.4.0]

at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:158) ~[flink-sql-connector-oceanbase-cdc-2.4.0.jar:2.4.0]

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.17.0.jar:1.17.0]

2023-11-06 09:24:29,901 INFO org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 80b32cfc9550315fc526758d2f0889ee

2023-11-06 09:24:29,901 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect (80b32cfc9550315fc526758d2f0889ee) switched from state RUNNING to FAILING.

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479) ~[flink-dist-1.17.0.jar:1.17.0]

at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) ~[?:?]

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]

at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_242]

at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_242]

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_242]

at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_242]

Caused by: java.util.concurrent.TimeoutException: Timeout to receive messages in RecordListener

at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readChangeRecords(OceanBaseRichSourceFunction.java:374) ~[flink-sql-connector-oceanbase-cdc-2.4.0.jar:2.4.0]

at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:158) ~[flink-sql-connector-oceanbase-cdc-2.4.0.jar:2.4.0]

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.17.0.jar:1.17.0]

2023-11-06 09:24:29,914 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job collect (80b32cfc9550315fc526758d2f0889ee) switched from state FAILING to FAILED.

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479) ~[flink-dist-1.17.0.jar:1.17.0]

at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) ~[?:?]

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_242]

at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_242]

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.actor.ActorCell.invoke(ActorCell.scala:547) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_bd3f1ada-d4d4-4d4c-a73a-a05ea3417c2f.jar:1.17.0]

at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_242]

at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_242]

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_242]

at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_242]

Caused by: java.util.concurrent.TimeoutException: Timeout to receive messages in RecordListener

at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readChangeRecords(OceanBaseRichSourceFunction.java:374) ~[flink-sql-connector-oceanbase-cdc-2.4.0.jar:2.4.0]

at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:158) ~[flink-sql-connector-oceanbase-cdc-2.4.0.jar:2.4.0]

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.17.0.jar:1.17.0]

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.17.0.jar:1.17.0]

2023-11-06 09:24:29,914 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping checkpoint coordinator for job 80b32cfc9550315fc526758d2f0889ee.

2023-11-06 09:24:29,915 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 80b32cfc9550315fc526758d2f0889ee reached terminal state FAILED.

org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)

at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)

at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)

at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)

at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)

at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)

at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)

at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)

at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)

at akka.actor.Actor.aroundReceive(Actor.scala:537)

at akka.actor.Actor.aroundReceive$(Actor.scala:535)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)

at akka.actor.ActorCell.invoke(ActorCell.scala:547)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)

at akka.dispatch.Mailbox.run(Mailbox.scala:231)

at akka.dispatch.Mailbox.exec(Mailbox.scala:243)

at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)

at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)

at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)

at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

Caused by: java.util.concurrent.TimeoutException: Timeout to receive messages in RecordListener

at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.readChangeRecords(OceanBaseRichSourceFunction.java:374)

at com.ververica.cdc.connectors.oceanbase.source.OceanBaseRichSourceFunction.run(OceanBaseRichSourceFunction.java:158)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)

at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)

at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)

2023-11-06 09:24:29,917 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job 80b32cfc9550315fc526758d2f0889ee has been registered for cleanup in the JobResultStore after reaching a terminal state.

2023-11-06 09:24:29,918 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job ‘collect’ (80b32cfc9550315fc526758d2f0889ee).

2023-11-06 09:24:29,921 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore [] - Shutting down

2023-11-06 09:24:29,921 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Disconnect TaskExecutor localhost:6704-895fc4 because: Stopping JobMaster for job ‘collect’ (80b32cfc9550315fc526758d2f0889ee).

2023-11-06 09:24:29,921 INFO org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool [] - Releasing slot [37866fdec06ab581c002006d4adbe2e7].

2023-11-06 09:24:29,922 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Close ResourceManager connection 5534fcf9e4e5b659aeb6cb8ddcc2608c: Stopping JobMaster for job ‘collect’ (80b32cfc9550315fc526758d2f0889ee).

2023-11-06 09:24:29,922 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Disconnect job manager

目前是看不出具体原因的,不过这里的 connect.timeout 不应该是个纯数字的,你可以用一个比默认的 ‘30s’ 大的时间值试一下。

等待了很久数据没有捕获到,我应该如何继续排查

可以用这个客户端的测试包 oblogclient-demo.zip 验证一下 flink 所在机器连接 oblogproxy 的连通性 。如果依然是没有报错但是客户端一直拿不到数据的话,可以把 logproxy 的 logrun/${client id}/log 两个目录的日志文件打包传上来看一下。

排查 flink 连 logproxy 的问题时,可以指定 logproxy.client.id,方便快速确定 run/${client id}/log 目录路径。另外就是对于 flink 环境里,需要注意把旧的依赖包从 lib 中移除。