请教一个问题,通过flink监听kafka消息处理后通过CDC输出数据至ob, 基于jdbc mysql的方式进行输出(见以下代码),发现一个奇怪的问题,flink程序运行几秒后会报错(报Not supported feature or function),但实际有输出成功部分数据输出至ob中(后面不再持续更新),但我将ob换成mysql后同样的代码没有任何问题,想老师帮忙看下是什么问题?
ob flink cdc sink建表语句 :
create table flink_page_view
(
visit_date varchar(20),
page_id varchar(40),
pre_page_id varchar(40),
visits bigint ,
CONSTRAINT pk_pv primary key(visit_date, page_id, visits) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.1.12.47:2883/remstestdb',
'username' = 'root',
'password' = '******',
'table-name' = 'flink_page_view'
);
flink插入ob代码片段:
val insertSQL: StringBuilder = new StringBuilder
insertSQL.append(" insert into flink_page_view (select visit_date , page_id, pre_page_id, visits from page_view)")
tableEnv.executeSql(insertSQL.toString())
基于mysql sink表输出没有任何问题,以下是mysql sink表的建表语句
create table flink_page_view
(
visit_date varchar(20),
page_id varchar(40),
pre_page_id varchar(40),
visits bigint ,
CONSTRAINT pk_pv primary key(visit_date, page_id, visits) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.1.12.26:3306/remstestdb',
'username' = 'rems',
'password' = '******',
'table-name' = 'flink_page_view'
);