Flink CDC 读取 Oceanbase binlog, 报错: Timeout to receive log messages in LogProxyClient.RecordListener

【 使用环境 】测试环境
【 OB or 其他组件 】
【 使用版本 】 Oceanbase 4.3.0.1 CE
【问题描述】使用 Flink CDC 读取 Oceanbase binlog, Flink 日志 报错: com.oceanbase.clogproxy.client.exception.LogProxyClientException: unsupported protocol version: 19968
【复现路径】问题出现前后相关操作
环境:

  • OBLogProxy 2.02 CE
  • OCP 4.2.2-20240315150922 CE
  • Flink 1.16
  • flink-connector-oceanbase-cdc-3.1.1.jar
  • ob-configserver-1.0.0-2.el7-feca6b9c76e26ac49464f34bfa0780b5a8d3f4a0

flink 相关日志:
flink.log (401.4 KB)

ObLogproxy 则提示:
Unexpected seq num, expected value is 1, actual value is 51
oblogproxy.tar.gz (99.4 KB)
请问是 oceanbase-cdc 的版本跟 ObLogproxy 不符吗? 需要降低 oceanbase-cdc 的版本吗?
但是 oceanbase-cdc-2.2.0 也是出现相似的情况

logproxy 的 binlog 模式应该搭配 flink mysql cdc 来使用,相当于把 observer + logproxy + obproxy 整体看作一个 mysql 实例这样,连接信息都是用 jdbc 的(也就是连接 obproxy 的)。参考 https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/mysql-cdc/ ,建议用最新版 3.1.1 。

刚刚测试了 flink 1.18 + mysql cdc 3.1.1, mysql cdc 读取完全量数据后, 似乎无法正常切换到增量数据, 一直在提示: threadExecutor is shut down, terminating keepalive thread
mysql_cdc.log (59.1 KB)

OBLogProxy 日志则报错:
Can not read response from client. Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
oblogproxy.tar.gz (1.2 MB)

试下启动参数加上 scan.incremental.snapshot.backfill.skip=true ,目前的 backfill 处理会在每个全量数据切片里启动一个 binlog 连接,这个对 binlog service 来说可能负载有点大。

加上了 参数 scan.incremental.snapshot.backfill.skip=true, 似乎没起作用, 现象与没加之前是一致的;
请问可以使用 mysql cdc 2.x 吗?
我这边使用 mysql cdc 2.4.0 测试, 遇到了 Failed to deserialize data of EventHeaderV4 错误
mysql_cdc_2.4.log (336.3 KB)
logproxy.tar.gz (209.1 KB)

flink mysql cdc 也可以用 2.4.x。

麻烦问下您那边现在用的 obproxy 是哪个版本?历史版本的 obproxy ce 版本可能在并发请求 binlog 时会有些问题,如果可以的话最好升到最新的 4.2.3。

另外 flink 这边的报错看起来和这个类似,很可能还是因为长时间拿不到数据造成的 Error while deserializing binlog event at offset · Issue #61 · apache/flink-cdc · GitHub

obproxy 的版本是 4.2.3.0 ce

请问川粉老师, 这类问题应该如何排查

这条日志显示您在应用程序(flink sql)里填的用户名没有带上集群名,需要改一下

[2024-07-10 21:30:05] [info] connection.cpp(210): The user binlog_user@ocp_meta on connection [binlog_user@ocp_meta,,]192.168.2.182:2983-192.168.2.183:37204/9 is not a valid ob username

这个报错显示找不到执行文件,需要看一下是不是磁盘满了,以及目录下是否有所用的租户对应的文件。正常情况下用 ocp_meta 租户的时候目录路径应该也是 ocp_meta,不清楚这里为什么是 ocp_monitor?

[2024-07-10 21:30:01] [error] fs_util.cpp(430): Failed to calculate disk sizeNo such file or directory for /usr/local/oblogproxy/run/sgoceanbase/ocp_monitor (deleted)/data/

之前是两个租户都有添加binlog 服务, 后面把ocp_monitor 的binlog 服务停止了; 但是停止之后 show binlog status 还能查到该租户相关的binglog, 在 oblogproxy/run 路径下还有 ocp_monitor租户 相关的文件存在, 就手动把那个路径删了, 然后日志中就有了Failed to calculate disk size for ocp_monitor (deleted)/data/ 相关错误; 后面重新建了一个ocp_monitor 相关的空路径, ``Failed to calculate disk size…` 报错就消失了

关于这个错误, 似乎跟集群名关系不是很大, 我尝试过带上跟不带集群名, 都会出现这个 is not a valid ob username 提示

现在 SHOW MASTER STATUSSHOW BINLOG STATUS 返回是什么?

现在 SHOW MASTER STATUSSHOW BINLOG STATUS 看着没什么问题

obclient [(none)]> show binlog status\G
*************************** 1. row ***************************
cluster: sgoceanbase
 tenant: ocp_meta
 status: {
        "binlog_files" :
        [
                {
                        "binlog_name" : "mysql-bin.000001",
                        "binlog_size" : 524288251
                },
                {
                        "binlog_name" : "mysql-bin.000002",
                        "binlog_size" : 524288096
                },
                {
                        "binlog_name" : "mysql-bin.000003",
                        "binlog_size" : 524288078
                },
                {
                        "binlog_name" : "mysql-bin.000004",
                        "binlog_size" : 64105871
                }
        ],
        "client_id" : "/usr/local/oblogproxy/run/sgoceanbase/ocp_meta",
        "cpu_status" :
        {
                "cpu_count" : 8,
                "cpu_used_ratio" : 1.5647226572036743
        },
        "disk_status" :
        {
                "disk_total_size_mb" : 51175,
                "disk_usage_size_process_mb" : 1561,
                "disk_used_ratio" : 0.7589448094367981,
                "disk_used_size_mb" : 38839
        },
        "memory_status" :
        {
                "mem_total_size_mb" : 11834,
                "mem_used_ratio" : 0.0,
                "mem_used_size_mb" : 1000
        },
        "network_status" :
        {
                "network_rx_bytes" : 0,
                "network_wx_bytes" : 0
        },
        "pid" : 25270
}
1 row in set (0.011 sec)
obclient [oceanbase]> show master status;
+------------------+----------+--------------+------------------+------------------------------------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set                              |
+------------------+----------+--------------+------------------+------------------------------------------------+
| mysql-bin.000004 | 63912272 |              |                  | 64db67ca-3eb4-11ef-8570-0050562ef501:1-3737768 |
+------------------+----------+--------------+------------------+------------------------------------------------+
1 row in set (0.682 sec)

用这个工具连一下试试呢? flink cdc 里 debezium 实际也是用的这个工具,按道理一样的参数是有相同的效果的。https://github.com/osheroff/mysql-binlog-connector-java

    public static void main(String[] args) throws Exception {
        // 连接信息换成你的环境
        BinaryLogClient client =
                new BinaryLogClient("127.0.0.1", 2883, "root@test", "********");
        client.registerEventListener(
                event -> {
                    if (event.getHeader().getEventType() != EventType.HEARTBEAT) {
                        LOG.info(event.toString());
                    }
                });
        try {
            client.connect();
        } catch (Exception e) {
            LOG.error(e.toString());
        } finally {
            client.disconnect();
        }
    }

通过这个工具可以读到数据, 但是读取了几条数据之后就断开了; 相同的代码测试MySQL 数据库, 则不存在该问题

17:40:52 INFO Test: Event{header=EventHeaderV4{timestamp=1721295651000, eventType=TABLE_MAP, serverId=1147473732, headerLength=19, dataLength=34, nextPosition=212353071, flags=1}, data=TableMapEventData{tableId=19, database='db88', table='student', columnTypes=3, 15, columnMetadata=0, 256, columnNullability={1}, eventMetadata=null}}
17:40:52 INFO Test: Event{header=EventHeaderV4{timestamp=1721295651000, eventType=EXT_WRITE_ROWS, serverId=1147473732, headerLength=19, dataLength=35, nextPosition=212353125, flags=1}, data=WriteRowsEventData{tableId=19, includedColumns={0, 1}, rows=[
    [2000014, hello world1]
]}}

运行日志如下:
shyiko.mysql.log (166.2 KB)
logproxy.zip (3.4 MB)

测试代码如下:

public static void main(String[] args) throws IOException, InterruptedException {
        Logger logger = Logger.getLogger("Test");

        // 连接信息换成你的环境
        BinaryLogClient client =
                new BinaryLogClient("192.168.2.183", 2883, "root@ocp_meta", "******");
        client.registerEventListener(
                event -> {
                    if (event.getHeader().getEventType() != EventType.HEARTBEAT) {
                        logger.info((event.toString()));
                    }
                });
        
        sleep(3000);
        try {
            client.setBinlogPosition(0);
            client.connect();
        } catch (Exception e) {
            logger.error(e.toString());
        } finally {
            client.disconnect();
        }
    }