Flink同步mysql数据到OB报错

【测试环境】

【 OB 】

【 4.2.1】

【问题描述】

Flink同步mysql数据到OB报错

相关报错:

Caused by: java.sql.SQLException: Not supported feature or function

at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)

at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)

at com.mysql.cj.jdbc.ClientPreparedStatement.executePreparedBatchAsMultiStatement(ClientPreparedStatement.java:603)

at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:431)

at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)

at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)

at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)

at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:102)

at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246)

at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216)

… 16 more

2 个赞

请提供一下背景信息,包括但不限于集群环境(Flink版本、组件版本)、测试表结构,以及报错任务的完整日志文件。

1 个赞

补充背景信息:

功能:flink sql通过mysql cdc读mysql写oceanbase
运行版本信息:
flink 1.17.2
source: mysql cdc 版本为3.0.0
sink : jdbc connector 包为flink-connector-jdbc-3.1.2-1.17.jar
oceanbase版本:4.2.1
表现:
全量消费无异常,后面增量消费小流量的时候没问题,到凌晨四点多有一个波峰,遇到这个波峰就失败
报错:Caused by: java.sql.SQLException: Not supported feature or function

详细报错:
2024-03-01 04:26:13
org.apache.flink.runtime.JobException: Recovery is suppressed by FailureRateRestartBackoffTimeStrategy(FailureRateRestartBackoffTimeStrategy(failuresIntervalMS=300000,backoffTimeMS=10000,maxFailuresPerInterval=10)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:258)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:249)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:242)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:748)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:725)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:80)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:479)
at jdk.internal.reflect.GeneratedMethodAccessor72.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: java.io.IOException: Writing records to JDBC failed.
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:198)
at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57)
at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.RuntimeException: Writing records to JDBC failed.
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:265)
at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.close(GenericJdbcSinkFunction.java:70)
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:115)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1043)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:951)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:934)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:934)
… 3 more
Caused by: java.lang.RuntimeException: Writing records to JDBC failed.
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.checkFlushException(JdbcOutputFormat.java:181)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:212)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.close(JdbcOutputFormat.java:262)
… 16 more
Caused by: java.io.IOException: java.sql.SQLException: Not supported feature or function
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
… 1 more
Caused by: java.sql.SQLException: Not supported feature or function
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executePreparedBatchAsMultiStatement(ClientPreparedStatement.java:603)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:431)
at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)
at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:102)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216)
… 7 more
Caused by: java.io.IOException: java.sql.SQLException: Not supported feature or function
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:195)
… 15 more
Caused by: java.sql.SQLException: Not supported feature or function
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
at com.mysql.cj.jdbc.ClientPreparedStatement.executePreparedBatchAsMultiStatement(ClientPreparedStatement.java:603)
at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:431)
at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:795)
at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65)
at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64)
at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:102)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246)
at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216)
… 16 more

CREATE TABLE if not exists log (
id BIGINT,
xx STRING,
xx BIGINT,
xx TINYINT,
xx STRING,
xx INT,
xx STRING,
xx TINYINT,
xx String ,
xx TIMESTAMP ,
xx TIMESTAMP ,
xx TIMESTAMP ,
xx STRING,
xx STRING,
xx STRING,
xx STRING ,
xx TIMESTAMP,
xx TIMESTAMP,
xx BIGINT ,
xx BIGINT ,
xx TINYINT,
xx TINYINT ,
xx TINYINT,
xx STRING,
xx STRING,
xx STRING,
xx TINYINT,
xx TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED) WITH (
‘connector’ = ‘mysql-cdc’,
‘hostname’ = ‘xx’,
‘port’ = ‘6007’,
‘username’ = ‘xx’,
‘password’ = ‘xx’,
‘database-name’ = ‘xx’,
‘scan.startup.mode’ = ‘latest-offset’,
‘table-name’ = ‘xx’,
‘server-id’ = ‘401-402’
);

CREATE TABLE flink_oceanbase_sink_id (
id BIGINT,
PRIMARY KEY (id) NOT ENFORCED)
WITH (
‘connector’ = ‘jdbc’,
‘url’ = ‘jdbc:mysql://xx:3306/test’,
‘username’ = ‘xx’,
‘password’ = ‘xx’,
‘table-name’ = ‘flink_oceanbase_sink_id’,
‘sink.parallelism’=‘5’
);

insert into flink_oceanbase_sink_id select id from log;

看报错应该是flink使用的jdbc驱动器和数据库的兼容性问题

1 个赞

那这种兼容性问题应该怎么解呀

这个有人能看下吗

jdbcUrl 里试试加上 rewriteBatchedStatements=true

好的,我先加上试试,不过是从哪方向看出来可能是这个问题的呀

https://blog.csdn.net/qq_44413835/article/details/117113156 这个参数的优化原理在这里

加上之后还是报错

这个各位老师能看下吗,加上参数还会报这个错,是不是哪里和MySQL不兼容

查询下GV$OB_SQL_AUDIT,看下具体是哪条语句报错Not supported feature or function

报错的时候用obdiag 工具分析下日志看,obdiag analyze log --since 10m (分析最近10分钟的日志)

这个照着执行了,没找到对应时间点的任务,是不是异常任务这个表记录不到

select TRACE_ID,usec_to_time(request_time), query_sql from gv$ob_sql_audit where tenant_id= xxx and query_sql != ‘’ and RET_CODE="-4007" order by request_time desc; 查看下报错的query_sql是什么,tenant_id换成实际的用户id


查到是空

可能sql audit被淘汰掉了,调大 ob_sql_audit_percentage 后再次执行下看看。主要是得捞出是哪条sql报错的


从这个能看出什么来吗,现在还没捞到具体的SQL

通过proxy日志的捞的话,可以看下obproxy_digest.log或者obproxy_error.log