【 使用环境 】生产环境
【 OB or 其他组件 】ocp部署的oceanbase4.5版本集群
【 使用版本 】
【问题描述】 在 Flink Streaming SQL 作业中,使用 Kafka → OBKV HBase Sink。
当 SQL 中使用 UNION 合并流时,即使 sink 的 buffer-flush.interval 和 buffer-flush.buffer-size 设置得非常小,仍然会导致 OceanBase 集群出现明显异常(事务等问题)。 将 UNION 改为 UNION ALL 后,OceanBase 集群正常。
sql如下
– =========================
– Minimal Repro SQL
– Flink Streaming SQL + OBKV HBase
– UNION causes OceanBase cluster issue, UNION ALL works fine
– =========================
– Kafka source (desensitized)
CREATE TABLE source_binlog (
owner_id STRING,
account_type BIGINT,
currency BIGINT,
tag BIGINT,
balance DECIMAL(38,20),
updated_at STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘test_topic’,
‘properties.bootstrap.servers’ = ‘broker1:9092’,
‘scan.startup.mode’ = ‘latest-offset’,
‘format’ = ‘json’
);
– OBKV HBase sink (desensitized)
CREATE TABLE sink_obkv (
rowkey STRING,
cf ROW,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
‘connector’ = ‘obkv-hbase’,
‘odp-mode’ = ‘true’,
‘odp-ip’ = ‘ob-proxy.example.com’,
‘odp-port’ = ‘2885’,
‘schema-name’ = ‘test’,
‘table-name’ = ‘kv_account_table’,
‘username’ = ‘ob_user@tenant#cluster’,
‘password’ = ‘******’,
‘buffer-flush.interval’ = ‘2s’,
‘buffer-flush.buffer-size’ = ‘2000’
);
– stream A
CREATE TEMPORARY VIEW stream_a AS
SELECT * FROM source_binlog
WHERE account_type IN (1,2);
– stream B
CREATE TEMPORARY VIEW stream_b AS
SELECT * FROM source_binlog
WHERE account_type IN (3);
–
Problematic UNION
CREATE TEMPORARY VIEW union_view AS
SELECT
CONCAT(owner_id,’’,currency,’’,account_type,’’,tag,’’,updated_at) AS rowkey,
ROW(CAST(balance AS STRING)) AS cf
FROM (
SELECT * FROM stream_a
UNION – Using UNION causes OceanBase cluster abnormal
SELECT * FROM stream_b
);
– sink
INSERT INTO sink_obkv
SELECT rowkey, cf
FROM union_view;
【问题咨询】
- 在 Flink SQL中,
UNION是否会改变数据下发节奏或聚合写入行为,从而放大 OBKV HBase 场景下的热点或事务冲突? - 在 OBKV HBase 多版本写入模型下,是否不建议在无界流中使用
UNION?