Flink-connector-oceanbase

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/repo/org/slf4j/slf4j-reload4j/1.7.36/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/repo/org/apache/logging/log4j/log4j-slf4j-impl/2.17.1/log4j-slf4j-impl-2.17.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Reload4jLoggerFactory]
log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread “main” org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table ‘default_catalog.default_database.t_sink’.

Table options are:

‘buffer-flush.batch-size’=‘100’
‘buffer-flush.buffer-size’=‘5000’
‘buffer-flush.interval’=‘1s’
‘compatible-mode’=‘mysql’
‘connection-pool-properties’=‘druid.initialSize=10;druid.maxActive=100’
‘connector’=‘oceanbase’
‘max-retries’=‘3’
‘password’=’******’
‘table-name’=‘t_sink’
‘upsert-mode’=‘true’
‘url’=‘jdbc:oceanbase://127.0.0.1:2881/test’
‘username’=‘root@test’
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:270)
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:459)
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:236)
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:194)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:194)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1803)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:881)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:989)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
at com.mojie.platform.job.Main.main(Main.java:37)
Caused by: org.apache.flink.table.api.ValidationException: One or more required options are missing.

Missing required options are:

schema-name
at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:612)
at org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions(FactoryUtil.java:582)
at org.apache.flink.table.factories.FactoryUtil$FactoryHelper.validate(FactoryUtil.java:930)
at com.oceanbase.connector.flink.OceanBaseDynamicTableSinkFactory.createDynamicTableSink(OceanBaseDynamicTableSinkFactory.java:39)
at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSink(FactoryUtil.java:267)
… 19 more

github上的demo

这里还需要指定一下 schema-name,demo 更新的时候有些遗漏,我尽快改一下。