OBLogProxy开启Binlog模式但Flink Cdc Mysql读取失败

大佬请教下,我通过命令查看show
parameters like ‘%config_url%’; 得出的是


那么我的binlog service配置命令应该是咋写呢?
CREATE BINLOG FOR TENANT obcluster.sys TO USER root PASSWORD XXX WITH CLUSTER URL ‘cluster_url’, SERVER UUID ‘d053fe39-df59-11ee-afeb-000c290d5b5d’;

首先命令不能是sys租户,另外不需要server uuid,cluster url可以通过命令来查询,如果不存在 cluster url,需要 ocp 或者 config server 来提供。

老师,您好,我这边新建了租户,并且程序拉起了ob-configserver
image

OBProxy已按该章节进行配置
https://www.oceanbase.com/docs/community-oblogproxy-doc-1000000000531989#1-title-配置%20OBProxy
查看参数已配置

执行了binlog配置命令
CREATE BINLOG FOR TENANT obcluster.jhdcp TO USER root PASSWORD 123456 WITH CLUSTER URL ‘http://192.168.101.41:3881/services?Action=GetObProxyConfig’;

可是我发现转换进程并没有启起来

目录下也无文件
image

查看cat log/logproxy.log发现有异常

请问一下有啥思路莫老师。

看一下 run 目录下面的日志

另外发一下配置文件

{
  "service_port": 2983,
  "encode_threadpool_size": 8,
  "encode_queue_size": 20000,
  "max_packet_bytes": 67108864,
  "record_queue_size": 20000,
  "read_timeout_us": 2000000,
  "read_fail_interval_us": 1000000,
  "read_wait_num": 20000,
  "send_timeout_us": 2000000,
  "send_fail_interval_us": 1000000,
  "check_quota_enable": false,
  "check_clog_enable": true,
  "command_timeout_s": 10,
  "log_quota_size_mb": 5120,
  "log_quota_day": 7,
  "log_gc_interval_s": 43200,
  "log_level": 2,
  "log_flush_strategy": 1,
  "log_flush_level": 2,
  "log_flush_period_s": 1,
  "log_max_file_size_mb": 1024,
  "log_retention_h": 360,
  "oblogreader_path_retain_hour": 168,
  "oblogreader_lease_s": 300,
  "oblogreader_path": "/usr/local/oblogproxy/run",
  "bin_path": "/usr/local/oblogproxy/bin",
  "oblogreader_obcdc_ce_path_template": "/usr/local/oblogproxy/obcdc/obcdc-ce-%d.x-access/libobcdc.so",
  "allow_all_tenant": true,
  "auth_user": true,
  "auth_use_rs": false,
  "auth_allow_sys_user": true,
  "ob_sys_username": "CFBF5360D0D8B09534FB1C26AE2D7BD7",
  "ob_sys_password": "B4038DFBFC27A0AB1F50D09A3F3048B0",
  "counter_interval_s": 2,
  "metric_enable": true,
  "metric_interval_s": 10,
  "debug": false,
  "verbose": false,
  "verbose_packet": false,
  "verbose_record_read": false,
  "readonly": false,
  "count_record": false,
  "channel_type": "plain",
  "tls_ca_cert_file": "",
  "tls_cert_file": "",
  "tls_key_file": "",
  "tls_verify_peer": true,
  "liboblog_tls": false,
  "liboblog_tls_cert_path": "",
  "binlog_log_bin_basename": "/usr/local/oblogproxy/run",
  "binlog_obcdc_ce_path_template": "/usr/local/oblogproxy/obcdc/obcdc-ce-%d.x-access/libobcdc.so",
  "binlog_ignore_unsupported_event": true,
  "binlog_max_event_buffer_bytes": 67108864,
  "binlog_mode": true,
  "table_whitelist": "",
  "binlog_nof_work_threads": 16,
  "binlog_bc_work_threads": 2,
  "binlog_max_file_size_bytes": 524288000,
  "binlog_convert_timeout_us": 10000,
  "binlog_checksum": true,
  "binlog_heartbeat_interval_us": 1000000,
  "binlog_gtid_display": true,
  "binlog_ddl_convert": true,
  "binlog_memory_limit": "3G",
  "binlog_working_mode": "storage",
  "binlog_recover_backup": true,
  "wait_rotate_ready_max_try": 1000
}

curl 一下 cluster url,目前看上去 url 返回不对


response是正常的喔

问题还是在 url 上面,rslist 是空的

老师请教一下,这个得咋配置呢有文档莫

你是使用的 config server 吗,看一下 config server 的文档吧

好嘞,我再try try

感谢老师,可以了,我按config server 的文档进行配置obconfig_url 参数就可以。十分感谢老师的支持 :hugs:

操作:

进入命令行

obclient  -h'127.0.0.1' -P 2883 -u'root'@sys#obcluster -D oceanbase -A -p'Jhmk.1234'

查询obconfig_url

select name,value,svr_ip,svr_port from oceanbase.__all_virtual_sys_parameter_stat where name='obconfig_url';

如无则修改

alter system set obconfig_url = 'http://192.168.101.41:3881/services?Action=ObRootServiceInfo&ObCluster=obcluster';

老师,目前binlog日志已正常生成了
image

但我使用Flink CDC读取依旧是报错的,请教下这里是需要调整Flink CDC 的Mysql Source配置莫
目前是尝试连接了OceanBase集群的2883代理端口,以及oblogproxy代理的2983端口都有报错

Flink 代码 demo

public class FlinkCdcForMysqlToKafkaByDs {
    public static void main(String[] args) throws Exception {
        MySqlSource<String> msyqlSource = MySqlSource.<String>builder()
                .hostname("192.168.101.41")
                .port(2883)
                .scanNewlyAddedTableEnabled(true)
                .databaseList("cdc_test")
                .tableList("cdc_test.ETL_FULL_OCEANBASE_DATATYPE_TEST")
                .username("root")
                .password("xxxx")
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000L);

        DataStreamSource<String> oracleParallelSource = env.fromSource(
                        msyqlSource,
                        WatermarkStrategy.noWatermarks(),
                        "OceanBaseParallelSource")
                .setParallelism(4);

        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
                "192.168.101.53:9092",
                "flink-cdc-test",
                new SimpleStringSchema()
        );

        oracleParallelSource.addSink(kafkaProducer);

        env.execute("OceanBase Snapshot + RedoLog");
    }
}

连接OceanBase集群异常栈

7459 [Source Data Fetcher for Source: mysqlParallelSource (1/4)#4] ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager  - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.flink.util.FlinkRuntimeException: Read split MySqlSnapshotSplit{tableId=cdc_test.ETL_FULL_OCEANBASE_DATATYPE_TEST, splitId='cdc_test.ETL_FULL_OCEANBASE_DATATYPE_TEST:0', splitKeyType=[`bigint_test` BIGINT NOT NULL], splitStart=null, splitEnd=null, highWatermark=null} error due to org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured.
	at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.checkReadException(SnapshotSplitReader.java:325)
	at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.pollSplitRecords(SnapshotSplitReader.java:257)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:118)
	at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:80)
	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
	... 6 more
Caused by: io.debezium.DebeziumException: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:123)
	at com.ververica.cdc.connectors.mysql.debezium.reader.SnapshotSplitReader.lambda$submitSplit$1(SnapshotSplitReader.java:136)
	... 3 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot read the binlog filename and position via 'SHOW MASTER STATUS'. Make sure your server is correctly configured
	at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.lambda$currentBinlogOffset$0(DebeziumUtils.java:130)
	at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642)
	at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510)
	at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.currentBinlogOffset(DebeziumUtils.java:117)
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.doExecute(MySqlSnapshotSplitReadTask.java:142)
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlSnapshotSplitReadTask.execute(MySqlSnapshotSplitReadTask.java:118)
	... 4 more

连接oblogproxy异常栈

4979 [Thread-29] ERROR org.apache.flink.runtime.source.coordinator.SourceCoordinator  - Failed to create Source Enumerator for source Source: OceanBaseParallelSource
org.apache.flink.util.FlinkRuntimeException: com.zaxxer.hikari.pool.HikariPool$PoolInitializationException: Failed to initialize pool: Variable 'transaction_read_only' is a GLOBAL variable 
	at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:71)
	at com.ververica.cdc.connectors.mysql.MySqlValidator.validate(MySqlValidator.java:72)
	at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:172)
	at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:222)
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:405)
	at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$6(RecreateOnResetOperatorCoordinator.java:150)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.lambda$closeAsyncWithTimeout$0(ComponentClosingUtils.java:77)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.zaxxer.hikari.pool.HikariPool$PoolInitializationException: Failed to initialize pool: Variable 'transaction_read_only' is a GLOBAL variable 
	at com.zaxxer.hikari.pool.HikariPool.throwPoolInitializationException(HikariPool.java:596)
	at com.zaxxer.hikari.pool.HikariPool.checkFailFast(HikariPool.java:575)
	at com.zaxxer.hikari.pool.HikariPool.<init>(HikariPool.java:115)
	at com.zaxxer.hikari.HikariDataSource.<init>(HikariDataSource.java:81)
	at com.ververica.cdc.connectors.mysql.source.connection.PooledDataSourceFactory.createPooledDataSource(PooledDataSourceFactory.java:61)
	at com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionPools.getOrCreateConnectionPool(JdbcConnectionPools.java:49)
	at com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:54)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:888)
	at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:883)
	at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:411)
	at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:68)
	... 11 more
Caused by: java.sql.SQLException: Variable 'transaction_read_only' is a GLOBAL variable 
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
	at com.mysql.cj.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:1403)
	at com.mysql.cj.jdbc.ConnectionImpl.isReadOnly(ConnectionImpl.java:1388)
	at com.zaxxer.hikari.pool.PoolBase.setupConnection(PoolBase.java:408)
	at com.zaxxer.hikari.pool.PoolBase.newConnection(PoolBase.java:369)
	at com.zaxxer.hikari.pool.PoolBase.newPoolEntry(PoolBase.java:206)
	at com.zaxxer.hikari.pool.HikariPool.createPoolEntry(HikariPool.java:476)
	at com.zaxxer.hikari.pool.HikariPool.checkFailFast(HikariPool.java:561)
	... 20 more

obproxy版本是多少,另外要通过obproxy来连接binlog

@自凡 嗯嗯,我是通过obproxy进行连接的。

目前版本搭配是
OceanBase-ce V4.2.1_CE_BP3 & V4.2.1_CE_BP4
ObProxy V4.2.1.0 & V4.2.3.0
Ob-configserver V1.0.0
OBLogProxy V2.0.1

oblogproxy run 日志正常

手动执行 show master status; 看下呢

这个我试过咧,状态是正常的

为什么还是sys租户呢?创建binlog服务需要为业务租户创建 binlog服务,你这里是sys租户。

1 个赞