🚀 基于 Flink 的 OceanBase AP 实时分析 demo

先看效果

2024-05-09-中文版-带二维码.2024-06-28 15_33_34

  • 支持 PC 端下单,也支持多人通过手机扫码在线下单,可交互性更强。
  • 订单数据写入 OB TP 数据库,并通过 Flink CDC 实时同步到 OB AP 数据库,并从 AP 库中查询最新数据。
  • 不管是 count(*) 计数还是 where + group by + order by 多条件查询,亿级别的数据查询耗时基本都在 10ms ~ 100ms,充分体现了 OB AP 的查询性能。(注: 该耗时包括了后端到数据库之间的网络延时,因此数据库内部的 SQL 耗时还要更低)
  • 只建了主键索引。

准备数据库

  • TP 数据库: OB 4.3.0 行存 + 开启 Binlog 服务
  • AP 数据库: OB 4.3.0 列存

使用 OBCloud 阿里云版本

  • 为了降低搭建成本,方便后续和 Flink 以及应用进行集成,直接使用 OBCloud 阿里云版本,数据库配置如下:

    • OB 版本 4.3.0.1,目前该版本在 OBCloud 需要开通白名单才能购买,具体可以联系官方技术服务同学进行开通。
    • 我这里购买一个按需付费的集群实例,3 节点,节点规格为 14C70G,价格 ¥32/小时。如果只用于测试,可以申请 :tada: OB Cloud 的 30 天免费试用,最低规格的 1C4G 就可以满足需求。

  • 在集群实例下创建了 oltp 和 olap 两个租户,用作 TP 和 AP 库,配置如下:

  • 在两个租户下分别创建数据库、访问账号,并配置 IP 白名单和公网访问地址,然后就能得到数据库连接串。具体过程不赘述,可以参考我之前的文章 使用阿里云的 OceanBase 云服务

TP 库开通 Binlog 服务

行存表和列存表

  • 连接 TP 库,创建 tp_car_orders 表(行存):
create table tp_car_orders(
    order_id bigint primary key NOT NULL AUTO_INCREMENT,
    -- order_time 默认值由数据库自动生成
    order_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, 
    customer_name varchar(25) COLLATE utf8mb4_bin NOT NULL, 
    sale_nation varchar(25) COLLATE utf8mb4_bin NOT NULL,
    sale_region varchar(25) COLLATE utf8mb4_bin NOT NULL,                   
    car_color varchar(25) COLLATE utf8mb4_bin NOT NULL, 
    car_price decimal(15,2) NOT NULL
);
  • 连接 AP 库,创建 ap_car_orders 表(列存)。

    • 由于两张表需要通过 Flink 进行同步,因此两张表的结构几乎完全一样。
    • 唯二的区别: ① 一个是行存,一个是列存 ② order_time 的默认值不同。
create table ap_car_orders(
    order_id bigint primary key NOT NULL AUTO_INCREMENT,
    -- order_time 从 tp_car_orders 表同步过来
    order_time timestamp NOT NULL, 
    customer_name varchar(25) COLLATE utf8mb4_bin NOT NULL, 
    sale_nation varchar(25) COLLATE utf8mb4_bin NOT NULL,
    sale_region varchar(25) COLLATE utf8mb4_bin NOT NULL,                   
    car_color varchar(25) COLLATE utf8mb4_bin NOT NULL, 
    car_price decimal(15,2) NOT NULL
) WITH COLUMN GROUP (each column);

准备 Flink

开通阿里云 Flink

  • 本地安装并使用 Flink 进行同步可以参考 OB 官方文档,我这里为了简单,直接使用 阿里云 Flink 托管版本。包年包月比较贵,可以使用按量付费版本,按照指引开通服务即可。

Flink 网络配置

  • 进入 Flink 工作空间,使用前建议使用网络探测功能验证数据库是否可正常连接。

  • 如果 Flink 和数据库在同一个 VPC: 可以通过数据库私网地址进行连接。

  • 如果 Flink 和数据库不在同一个 VPC:

    • 配置 跨 VPC 访问
    • 通过数据库公网地址进行连接,但阿里云 Flink 默认不支持访问公网,需要配置 NAT 网关实现 VPC 网络与公网网络互通,详见 文档
  • 保证 Flink 和数据库可以互通之后,网络探测的结果应该如下:

Flink 同步配置

  • 进入「配置管理」,修改作业默认配置,将「系统检查点间隔」和「两次系统检查点间最短间隔」两个参数均改为 1 秒,以保证同步的实时性。

Flink SQL 开发

  • 进入「SQL 开发-新建-新建空白的留作业草稿」

  • 输入以下 SQL 内容:
-- 创建 TP CDC 表,表结构和源表 tp_car_orders 一致
CREATE TEMPORARY TABLE tp_car_orders_cdc (
    order_id bigint primary key NOT ENFORCED,
    order_time timestamp NOT NULL, 
    customer_name varchar(25) NOT NULL, 
    sale_nation varchar(25) NOT NULL,
    sale_region varchar(25) NOT NULL,                   
    car_color varchar(25) NOT NULL, 
    car_price decimal(15,2) NOT NULL
) WITH (
    -- OB Binlog 服务兼容 MySQL BinLog 服务,因此可以使用 mysql-cdc 作为连接器
    'connector' = 'mysql-cdc',
    'hostname' = <HOST>,
    'port' = <PORT>,
    'username' = <USER_NAME>,
    'password' = <PASSWORD>,
    'database-name' = 'oltp',
    'table-name' = 'tp_car_orders'
);

-- 创建 AP CDC 表,表结构和目标表 ap_car_orders 一致
CREATE TEMPORARY TABLE ap_car_orders_cdc (
    order_id bigint primary key NOT ENFORCED,
    order_time timestamp NOT NULL, 
    customer_name varchar(25) NOT NULL, 
    sale_nation varchar(25) NOT NULL,
    sale_region varchar(25) NOT NULL,                   
    car_color varchar(25) NOT NULL, 
    car_price decimal(15,2) NOT NULL
) WITH (
    -- 使用 jdbc 连接器
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://<HOST>:<PORT>/olap',
    'username' = <USER_NAME>,
    'password' = <PASSWORD>,
    'table-name' = 'ap_car_orders',
    -- 不缓存记录,直接 flush 数据
    'sink.buffer-flush.max-rows' = '0',
    -- flush 数据的时间间隔设为 0,直接 flush 数据
    'sink.buffer-flush.interval' = '0'
);

-- 将 TP CDC 表同步到 AP CDC 表
INSERT INTO ap_car_orders_cdc SELECT * FROM tp_car_orders_cdc;
  • :loudspeaker: 注意: 需要调整 ap_car_orders_cdc 以下两个参数,同样的为了保证同步的实时性,我这里均设为 '0' ,即不做缓存和间隔,直接 flush 数据。这两个参数的用法详见 文档

    • sink.buffer-flush.max-rows
    • sink.buffer-flush.interval
  • SQL 语法检查和网络连通性校验:

  • 启动 SQL 调试,并往 tp_car_orders 表里插入一条数据:
INSERT INTO `tp_car_orders` (`car_price`,`car_color`,`sale_region`,`sale_nation`,`customer_name`) VALUES (299900,'blue','Washington','America','Lucy');
  • 预期能够捕获到 tp_car_orders 的数据变更,但变更还不会写入 ap_car_orders,需要实际部署到作业才能写入。

Flink SQL 部署

  • 部署 SQL 作业:

  • 在「作业运维」即可查看对应作业:

  • 部署并启动成功后,重新往 tp_car_orders 表里插入一条数据:
INSERT INTO `tp_car_orders` (`car_price`,`car_color`,`sale_region`,`sale_nation`,`customer_name`) VALUES (299900,'blue','Washington','America','Lucy');
  • 可以看到在 ap_car_orders 表中数据已经同步:
mysql> select * from ap_car_orders;
+----------+---------------------+---------------+-------------+-------------+-----------+-----------+
| order_id | order_time          | customer_name | sale_nation | sale_region | car_color | car_price |
+----------+---------------------+---------------+-------------+-------------+-----------+-----------+
|        1 | 2024-05-07 17:10:37 | Lucy          | America     | Washington  | blue      | 299900.00 |
+----------+---------------------+---------------+-------------+-------------+-----------+-----------+
1 row in set (0.02 sec)

应用配置

OLTP_DATABASE_URL=""
OLAP_DATABASE_URL=""
  • pnpm run dev 启动应用。

导入数据

  • clone 代码仓库,通过 npm run seed批量导入脚本,默认导入 1.5 亿条数据,可修改脚本逻辑按需调整:

  • 导入数据后需要对 AP 库发起一次合并,才能保证最优的查询性能。
mysql> ALTER SYSTEM MAJOR FREEZE;

AP SQL 调优(可选)

运行效果

10 个赞

太赞了,干货满满:+1: :star_struck:

5 个赞

欢迎来关注DeveloperHub:OceanBase 社区
这里的目标是汇聚应用开发所需的示例程序、数据集成与开发工具,以及相关的文档和学习资源。

当然,非常欢大家迎通过 GitHub 仓库中提交 Pull Request来一起完善示例程序呀
GitHub 仓库地址:GitHub - oceanbase/ob-samples: Show how to use OceanBase, providing the sample projects for OceanBase.

3 个赞

ob支持行列混存,一份数据就可以了,为什么还要同步呢?

3 个赞

这个 demo 主要想体现 OB 的两个能力:

  • 实时 AP (列存) 能力
  • 和 Flink 等大数据生态的对接能力

上游 TP 库也可以是 MySQL 或其他数据库,这也是目前社区和行业实践比较多的架构,用不同的数据库分别支撑 TP 和 AP 场景。因为 TP 库一般不会轻易替换,引入单独的 AP 库则简单很多。

至于 OB 的行列混存能力,则是使用一套数据库同时支持 TP 和 AP 业务,这也是我们后续推荐的另一种解决方案,但这个方案适合 TP 和 AP 库都用 OB 的用户。

3 个赞

恩,我猜也是只是为了模拟

大佬,ob的行列混存是在建表的时候默认了还是需要什么关键字设置一下

需要在建表时进行指定,详见官方文档 数据表设计最佳实践-列存选择