Flink SQL中使用UNION写入OceanBase OBKV HBase 会导致集群异常

【 使用环境 】生产环境
【 OB or 其他组件 】ocp部署的oceanbase4.5版本集群
【 使用版本 】
【问题描述】 在 Flink Streaming SQL 作业中,使用 Kafka → OBKV HBase Sink。
当 SQL 中使用 UNION 合并流时,即使 sink 的 buffer-flush.intervalbuffer-flush.buffer-size 设置得非常小,仍然会导致 OceanBase 集群出现明显异常(事务等问题)。 将 UNION 改为 UNION ALL 后,OceanBase 集群正常。

sql如下
– =========================
– Minimal Repro SQL
– Flink Streaming SQL + OBKV HBase
– UNION causes OceanBase cluster issue, UNION ALL works fine
– =========================

– Kafka source (desensitized)
CREATE TABLE source_binlog (
owner_id STRING,
account_type BIGINT,
currency BIGINT,
tag BIGINT,
balance DECIMAL(38,20),
updated_at STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ = ‘test_topic’,
‘properties.bootstrap.servers’ = ‘broker1:9092’,
‘scan.startup.mode’ = ‘latest-offset’,
‘format’ = ‘json’
);

– OBKV HBase sink (desensitized)
CREATE TABLE sink_obkv (
rowkey STRING,
cf ROW,
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
‘connector’ = ‘obkv-hbase’,
‘odp-mode’ = ‘true’,
‘odp-ip’ = ‘ob-proxy.example.com’,
‘odp-port’ = ‘2885’,
‘schema-name’ = ‘test’,
‘table-name’ = ‘kv_account_table’,
‘username’ = ‘ob_user@tenant#cluster’,
‘password’ = ‘******’,
‘buffer-flush.interval’ = ‘2s’,
‘buffer-flush.buffer-size’ = ‘2000’
);

– stream A
CREATE TEMPORARY VIEW stream_a AS
SELECT * FROM source_binlog
WHERE account_type IN (1,2);

– stream B
CREATE TEMPORARY VIEW stream_b AS
SELECT * FROM source_binlog
WHERE account_type IN (3);

:warning: Problematic UNION
CREATE TEMPORARY VIEW union_view AS
SELECT
CONCAT(owner_id,’’,currency,’’,account_type,’’,tag,’’,updated_at) AS rowkey,
ROW(CAST(balance AS STRING)) AS cf
FROM (
SELECT * FROM stream_a
UNION – Using UNION causes OceanBase cluster abnormal
SELECT * FROM stream_b
);

– sink
INSERT INTO sink_obkv
SELECT rowkey, cf
FROM union_view;

【问题咨询】

  1. 在 Flink SQL中,UNION 是否会改变数据下发节奏或聚合写入行为,从而放大 OBKV HBase 场景下的热点或事务冲突?
  2. 在 OBKV HBase 多版本写入模型下,是否不建议在无界流中使用 UNION
1 个赞

会改变

  • UNION 在流式场景下会引入一个去重算子(通常是基于 ROW_NUMBER 或 Aggregate 实现)
  • 这个算子需要维护状态来判断重复,可能会引入以下影响:
  • 数据缓冲延迟:等待足够的数据来判断重复
  • 批量释放:checkpoint 或 watermark 推进时批量输出
  • 下游压力突增:原本均匀的流变成脉冲式写入
  1. OBKV HBase 特性:
  • 使用 timestamp 作为版本号

  • 批量写入同一 rowkey 会触发行级锁竞争

  • 事务冲突会导致重试,进一步放大压力
    建议在无界流中使用 UNION ALL 而非 UNION