flinkcdc接oblogpoxry报错

【 使用环境 】生产环境 or 测试环境
测试环境

【 OB or 其他组件 】
OB FLINKCDC

【 使用版本 】
4.1.0

【问题描述】清晰明确描述问题

flinksql接oblogproxy,读不出全量和增量数据。

oblogproxy log:
Log line format: [IWEF]yyyymmdd hh:mm:ss.uuuuuu threadid file:line] msg
I20230531 10:42:31.850749 122683 thread.cpp:50] +++ Create thread: (122676)
I20230531 10:42:31.851044 122683 file_gc.cpp:34] file gc size quota MB: 5242880
I20230531 10:42:31.851051 122683 file_gc.cpp:35] file gc time quota day: 7
I20230531 10:42:31.851055 122683 file_gc.cpp:36] file gc path: ./log
I20230531 10:42:31.851061 122683 file_gc.cpp:38] file gc prefixs: ./bin/logproxy.log
I20230531 10:42:31.851066 122683 file_gc.cpp:38] file gc prefixs: liboblog.log.2
I20230531 10:42:31.851069 122683 file_gc.cpp:38] file gc prefixs: logproxy_
I20230531 10:42:31.851161 122675 channel_factory.cpp:50] ChannelFactory init with server mode
I20230531 10:42:31.851382 122684 thread.cpp:50] +++ Create thread: (122677)
I20230531 10:42:31.851399 122675 comm.cpp:105] +++ Listen on port: 2983, fd: 7
I20230531 10:42:31.851411 122675 comm.cpp:111] +++ Communicator about to start
E20230531 10:42:31.881721 122684 sys_metric.cpp:48] Failed to collect network for invalid content:default via 172.18.2.254 dev ens192 proto static metric 100
E20230531 10:42:31.882000 122684 status_thread.cpp:34] Failed to initiate metric, exit StatusThread

flinksql建表语句:
CREATE TABLE orders1(
id INT,
name STRING,
pdata date,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
‘connector’ = ‘oceanbase-cdc’,
‘scan.startup.mode’ = ‘initial’,
‘username’ = ‘cdctest’,
‘password’ = ‘’,
‘tenant-name’ = ‘sys’,
‘database-name’ = ‘GAIA’,
‘table-name’ = ‘orders’,
‘hostname’ = ‘172.18.2.106’,
‘port’ = ‘2883’,
‘rootserver-list’ = ‘172.18.2.104:2882:2881;172.18.2.105:2882:2881;172.18.2.106:2882:2881’,
‘logproxy.host’ = ‘172.18.2.105’,
‘logproxy.port’ = ‘2983’);

【复现路径】问题出现前后相关操作

【问题现象及影响】
不出增量和全量数据

【附件】

1 个赞

执行一下 show variables like “%version%”;

show variables like "%version%";
+-------------------------+------------------------------------------------------------------------------------------------------------------+
| Variable_name           | Value                                                                                                            |
+-------------------------+------------------------------------------------------------------------------------------------------------------+
| ob_last_schema_version  | 0                                                                                                                |
| protocol_version        | 10                                                                                                               |
| tls_version             |                                                                                                                  |
| version                 | 5.7.25-OceanBase_CE-v4.1.0.0                                                                                     |
| version_comment         | OceanBase_CE 4.1.0.0 (r100000192023032010-0265dfc6d00ff4f0ff4ad2710504a18962abaef6) (Built Mar 20 2023 10:12:57) |
| version_compile_machine |                                                                                                                  |
| version_compile_os      |                                                                                                                  |
+-------------------------+------------------------------------------------------------------------------------------------------------------+
7 rows in set (0.001 sec)
``
flink cdc 的版本是什么?

目前 logproxy 支持的 ob 最新版本为 4.1.0 CE,bp 版本暂时还不行,需要确认下 ob 具体的小版本。

另外可以的话提供下 flink cdc 的版本以及任务日志,我们结合起来看一下。

flink:1.16

flinkcdc:2.3.0

oceanbase-cdc connector包: flink-sql-connector-oceanbase-cdc-2.3.0.jar

谢谢


貌似还是包的问题,但是我包已经加到flink lib目录下了, 并且重启了


目前flink job已经没有错误日志了, 但是还是读不到数据,麻烦看看

这个是 4.1 bp1 的 ob,logproxy 目前还不支持。

我是通过OBD界面安装的,请问通过OBD界面安装能安装非pb版的OB吗?谢谢


已经重新安装非pb版ob,

目前flinksql语句为:

create table orders2(id INT,name STRING,pdata date,PRIMARY KEY (id) NOT ENFORCED) WITH (
‘connector’ = ‘oceanbase-cdc’,
‘scan.startup.mode’ = ‘initial’,
‘username’ = ‘cdctest@cdc_tenant1’,
‘password’ = ‘cdctestpwd’,
‘tenant-name’ = ‘cdc_tenant1’,
‘database-name’ = ‘xxxx’,
‘table-name’ = ‘orders’,
‘hostname’ = ‘172.18.2.106’,
‘port’ = ‘2883’,
‘rootserver-list’ = ‘172.18.2.104:2882:2881;172.18.2.105:2882:2881;172.18.2.106:2882:2881’,
‘logproxy.host’ = ‘172.18.2.105’,
‘logproxy.port’ = ‘2983’);

oblogproxy运行正常:

info日志:
I20230601 14:39:21.309943 327733 mysql_protocol.cpp:122] Auth user success of server: 172.18.2.104:2881, user: cdctest@cdc_tenant1
I20230601 14:39:21.309964 327733 mysql_protocol.cpp:214] Query obmysql SQL:show tenant
I20230601 14:39:21.351444 327733 mysql_protocol.cpp:214] Query obmysql SQL:SELECT upper(privilege_type) AS priv FROM information_schema.user_privileges WHERE privilege_type=‘SELECT’ AND grantee LIKE “‘cdctest’@%” UNION SELECT upper(privilege_type) AS priv FROM information_schema.schema_privileges WHERE privilege_type=‘SELECT’ AND grantee LIKE “‘cdctest’@%” AND table_schema=‘gaia’ UNION SELECT upper(privilege_type) AS priv FROM information_schema.table_privileges WHERE privilege_type=‘SELECT’ AND grantee LIKE “‘cdctest’@%” AND table_schema=‘gaia’ AND table_name=‘orders’
I20230601 14:39:21.392273 327733 io.cpp:119] Connect to server success after poll. host=172.18.2.104,port=2881
I20230601 14:39:21.392306 327733 mysql_protocol.cpp:51] Connect to server success: 172.18.2.104:2881, user: cdctest
I20230601 14:39:21.392390 327733 mysql_protocol.cpp:82] Receive handshake packet from server: 172.18.2.104:2881, user: cdctest
I20230601 14:39:21.392400 327733 ob_mysql_packet.cpp:256] Observer version: 5.7.25
I20230601 14:39:21.392410 327733 ob_mysql_packet.cpp:264] Connection id: 3221729106
I20230601 14:39:21.392417 327733 ob_mysql_packet.cpp:355] Auth plugin name: mysql_native_password
I20230601 14:39:21.392441 327733 ob_mysql_packet.cpp:589] Handshake response packet len: 83
I20230601 14:39:21.397019 327733 mysql_protocol.cpp:122] Auth user success of server: 172.18.2.104:2881, user: cdctest
I20230601 14:39:21.397049 327733 mysql_protocol.cpp:214] Query obmysql SQL:SELECT svr_min_log_timestamp FROM oceanbase.__all_virtual_server_clog_stat WHERE zone_status=‘ACTIVE’;
I20230601 14:39:21.437605 327733 ob_mysql_packet.cpp:220] Error packet: [31487][#42S02] Table ‘oceanbase.__all_virtual_server_clog_stat’ doesn’t exist
E20230601 14:39:21.437644 327733 mysql_protocol.cpp:239] Failed to query observer:Table ‘oceanbase.__all_virtual_server_clog_stat’ doesn’t exist, unexpected column count: 0
E20230601 14:39:21.437671 327733 clog_meta_routine.cpp:45] Failed to check the existence of svr_min_log_timestamp column in __all_virtual_server_clog_stat, disable clog check
I20230601 14:39:21.437724 327733 arranger.cpp:223] Client connecting: type:0, id:172.18.2.104_345791_1685601559_1_cdc_tenant1, ip:172.18.2.106, version:1.0.5, configuration:tb_white_list=cdc_tenant1.gaia.orders cluster_user=cdctest@cdc_tenant1 tb_black_list=| timezone=+00:00 working_mode=storage rootserver_list=172.18.2.104:2882:2881;172.18.2.105:2882:2881;172.18.2.106:2882:2881 first_start_timestamp=0 cluster_password=A0D8D0C429643E3EC1C5758F1759A6FD24098F2E , pid:0, peer:fd:10, register_time:1685601560, enable_monitor:0, packet_version:2,
I20230601 14:39:21.439268 327733 source_invoke.cpp:75] +++ Created oblogreader with pid: 336090
I20230601 14:39:21.439311 327733 comm.cpp:324] Try close Channel of peer: id:7638688448324960266, fd:10, addr:1778520748, port:6218
I20230601 14:39:21.439360 327733 channel.h:46] Closed fd: 10
I20230601 14:39:21.439370 327733 arranger.cpp:241] Remove peer: id:7638688448324960266, fd:10, addr:1778520748, port:6218 after source invoked, current channel count:0
I20230601 14:39:21.439390 327733 arranger.cpp:246] Client connected: 172.18.2.104_345791_1685601559_1_cdc_tenant1 with peer: id:7638688448324960266, fd:10, addr:1778520748, port:6218

但是flinksql中读取不到全量和增量数据,麻烦老师帮忙查看

已经测试通过

最终是遇到什么问题,怎么解决的?

你这个问题最后怎么解决的呀!