【 使用环境 】测试环境
【 ob-ce, ob-proxy, ob-configserver, obbinlog, flink-connector-oceanbase-cdc, oblogclient-logproxy】
【 4.2.4.0, 4.3.6.1, 1.0.1, 4.2.4, 3.1.0, 1.1.3】
【问题描述】启动时直接报错:
17:31:46.395 [log-proxy-client-worker-1-thread-1] INFO com.oceanbase.clogproxy.client.connection.ClientHandler - ClientId: 10.10.23.11_627026_1778319106_73_ibs6: rootserver_list=10.10.53.68:2882:2881, cluster_id=, cluster_user=root@ibs6#myoceanbase, cluster_password=, , sys_user=, sys_password=, tb_white_list=ibs6.ibs6_alarm.tbl_alarm_record_unhandle, tb_black_list=|, start_timestamp=1778319105, start_timestamp_us=0, timezone=+08:00, working_mode=storage connecting LogProxy: 10.10.53.68:2983
17:31:46.434 [log-proxy-client-worker-1-thread-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
17:31:46.434 [log-proxy-client-worker-1-thread-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
17:31:46.434 [log-proxy-client-worker-1-thread-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.chunkSize: 32
17:31:46.434 [log-proxy-client-worker-1-thread-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.blocking: false
17:31:46.434 [log-proxy-client-worker-1-thread-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.batchFastThreadLocalOnly: true
17:31:46.441 [log-proxy-client-worker-1-thread-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
17:31:46.441 [log-proxy-client-worker-1-thread-1] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
17:31:46.442 [log-proxy-client-worker-1-thread-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7a757413
17:31:46.451 [log-proxy-client-worker-1-thread-1] ERROR com.oceanbase.clogproxy.client.connection.ClientHandler - Unsupported protocol version: 19968
17:31:46.453 [log-proxy-client-worker-1-thread-1] ERROR com.oceanbase.clogproxy.client.connection.ClientHandler - Exception occurred ClientId: 10.10.23.11_627026_1778319106_73_ibs6: rootserver_list=10.10.53.68:2882:2881, cluster_id=, cluster_user=root@ibs6#myoceanbase, cluster_password=, , sys_user=, sys_password=, tb_white_list=ibs6.ibs6_alarm.tbl_alarm_record_unhandle, tb_black_list=|, start_timestamp=1778319105, start_timestamp_us=0, timezone=+08:00, working_mode=storage, with LogProxy: 10.10.53.68:2983
com.oceanbase.clogproxy.client.exception.LogProxyClientException: Unsupported protocol version: 19968
at com.oceanbase.clogproxy.client.connection.ClientHandler.checkHeader(ClientHandler.java:276)
at com.oceanbase.clogproxy.client.connection.ClientHandler.handleHeader(ClientHandler.java:193)
at com.oceanbase.clogproxy.client.connection.ClientHandler.channelRead(ClientHandler.java:160)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
17:31:46.454 [log-proxy-client-worker-1-thread-1] INFO com.oceanbase.clogproxy.client.connection.ClientStream - Try to stop this client
17:31:48.395 [Thread-8] INFO com.oceanbase.clogproxy.client.connection.ClientStream - Client process thread exit
17:31:48.395 [log-proxy-client-worker-1-thread-1] INFO com.oceanbase.clogproxy.client.connection.ClientStream - Client stopped successfully
17:31:48.395 [log-proxy-client-worker-1-thread-1] INFO com.oceanbase.clogproxy.client.connection.ClientHandler - Channel closed with ClientId: 10.10.23.11_627026_1778319106_73_ibs6, LogProxy: 10.10.53.68:2983
17:31:49.684 [Checkpoint Timer] INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 1 (type=CheckpointType{name=‘Checkpoint’, sharingFilesStrategy=FORWARD_BACKWARD}) @ 1778319109676 for job 27466c5f9f2e7e6663afc4033cb713e4.
17:31:49.687 [flink-akka.actor.default-dispatcher-8] DEBUG org.apache.flink.runtime.taskexecutor.TaskExecutor - Trigger checkpoint 1@1778319109676 for b5a2e6718526b922a5799f5baa5108b5.
【复现路径】只要启动就报错,以下是我的实现代码:
Properties properties = new Properties();
properties.put(“transforms”, “unwrap”);
properties.put(“transforms.unwrap.type”, “io.debezium.transforms.ExtractNewRecordState”);
properties.put(“transforms.unwrap.add.fields”, “op”);
properties.put(“transforms.unwrap.delete.handling.mode”, “rewrite”);
properties.put(“column.mappers”, SkipDefaultCreatedTimeConverter.class.getName());
SourceFunction<String> source = OceanBaseSource.<String>builder()
.startupOptions(StartupOptions.initial())
.hostname(mysqlHost)
.port(2883)
.username(mysqlUsername)
.password(mysqlPassword)
.tenantName(tenantName)
.databaseName(dataBaseName)
.tableName(tableName)
.logProxyHost(mysqlHost)
.logProxyPort(2983)
.rsList("10.10.53.68:2882:2881")
.serverTimeZone("+08:00")
.deserializer(new JsonDebeziumDeserializationSchema())
.debeziumProperties(properties)
.build();
DataStreamSource<String> stream = env.addSource(source, sourceName);
alarmRecordStream.print();
try {
//flink执行方法改为异步
JobClient jobClient = env.executeAsync("cdc任务-监听实时数据");
} catch (Exception e) {
throw new RuntimeException(e);
}