flink自定义oceanbase的sink将计算结果数据写入ob中, 程序运行几小时之后, 就提示这个报错,com.oceanbase.jdbc.internal.util.exceptions.OceanBaseSqlException: Broken pipe (Write failed)
at com.oceanbase.jdbc.internal.util.exceptions.OceanBaseSqlException.of(OceanBaseSqlException.java:79)
at com.oceanbase.jdbc.internal.protocol.AbstractQueryProtocol.exceptionWithQuery(AbstractQueryProtocol.java:196)
at com.oceanbase.jdbc.internal.protocol.AbstractQueryProtocol.executeQuery(AbstractQueryProtocol.java:307)
at com.oceanbase.jdbc.internal.protocol.AbstractQueryProtocol.executeQuery(AbstractQueryProtocol.java:278)
at com.oceanbase.jdbc.internal.protocol.AbstractQueryProtocol.setAutoCommit(AbstractQueryProtocol.java:1761)
at com.oceanbase.jdbc.OceanBaseConnection.setAutoCommit(OceanBaseConnection.java:871)
at com.zj.shugong.flink.sink.IdxOracleSinkWithDim.invoke(IdxOracleSinkWithDim.java:90)
at com.zj.shugong.flink.sink.IdxOracleSinkWithDim.invoke(IdxOracleSinkWithDim.java:19)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunction.apply(PassThroughWindowFunction.java:36)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:46)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLNonTransientConnectionException: Broken pipe (Write failed)
at com.oceanbase.jdbc.internal.protocol.AbstractQueryProtocol.handleIoException(AbstractQueryProtocol.java:2513)
… 35 more
Caused by: java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at com.oceanbase.jdbc.internal.io.output.StandardPacketOutputStream.flushBuffer(StandardPacketOutputStream.java:135)
at com.oceanbase.jdbc.internal.io.output.AbstractPacketOutputStream.flush(AbstractPacketOutputStream.java:204)
at com.oceanbase.jdbc.internal.protocol.AbstractQueryProtocol.executeQuery(AbstractQueryProtocol.java:298)
… 34 more
看样子是用oceanbase-client写ob oracle的时候连接被关闭了,这个可能的原因有很多,比如达到了 observer 的 wait_timeout,网络问题等等。
您可以先试试在连接中加上 autoReconnect=true
,并且确认下 socketTimeout 是否小于 observer 的 wait_timeout,如果是使用的连接池,可以设置一下 validationQuery,比如 select 1 from dual
。
额,socketTimeout 和wait_timeout 的参数怎么查询?
socketTimeout是oceanbase-client的参数,如果没配置的话默认是10s。
wait_timeout可以通过 show variables like 'wait_timeout'
查到。