OceanBase + flink + OceanBase Binlog 服务 V4.0.1 可以实现数据同步吗

【 使用环境 】生产环境 or 测试环境
【 OB or 其他组件 】由于cdc模式 没有高可用,生产环境存单点故障。准备使用binlog模式作为同步基础,OceanBase + flink + OceanBase Binlog 服务 V4.0.1 这种可以实现数据同步吗
【 使用版本 】OceanBase 4.2.2.1/4.2.1.8 + flink + OceanBase Binlog 服务 V4.0.1
【问题描述】清晰明确描述问题
【复现路径】问题出现前后相关操作
【附件及日志】推荐使用OceanBase敏捷诊断工具obdiag收集诊断信息,详情参见链接(右键跳转查看):

【SOP系列 22 】——故障诊断第一步(自助诊断和诊断信息收集)

【备注】基于 LLM 和开源文档 RAG 的论坛小助手已开放测试,在发帖时输入 [@论坛小助手] 即可召唤小助手,欢迎试用!

1 个赞

根据文档库中的信息,OceanBase + Flink + OceanBase Binlog 服务 V4.0.1 可以实现数据同步。但是需要注意以下几点:

  1. 版本兼容性

    • 确保 OceanBase 版本为 V4.2.2.1 或 V4.2.1.8,这些版本与 Flink 和 OceanBase Binlog 服务 V4.0.1 具有良好的兼容性。
  2. 高可用性

    • 由于 CDC 模式没有高可用,生产环境中存在单点故障的风险。使用 Binlog 模式可以提高数据同步的可靠性,但仍需确保整个系统的高可用性设计,例如通过多节点部署和故障切换机制来减少单点故障的影响。
  3. 资源配置

    • 根据文档片段 4 的建议,OMS 社区版各组件的标准资源规格如下:
      • Store:
        • CPU: 4C
        • Memory: 8G
        • Disk: 取决于数据库端增量日志的生产速度和 OMS 中增量数据的保存时间
      • Incr-Sync:
        • CPU: 2C
        • Memory: 4G
        • Disk: 5G
      • Full-Import:
        • CPU: 2C
        • Memory: 4G
        • Disk: 5G
      • 总计:
        • CPU: 8C
        • Memory: 16G
        • Disk: >10G
  4. 注意事项

    • 生成列:如果表中包含生成列,确保这些列标记为 STORED 属性,否则同步目标端时该列将同步为 NULL 值。
    • LOB 列:如果表中包含 LOB 列(如 JSON、GIS、XML、UDT、LONGTEXT、MEDIUMTEXT 等),在 UPDATEDELETE 操作时需注意 LOB 列的值可能为 NULL。
    • 时钟同步:确保节点之间的时钟同步,避免增量同步的延迟时间不准确。
    • 断点续传:如果任务意外中断,RocketMQ 实例中可能会存在部分重复数据,下游系统需要具备排重能力。
  5. 配置示例

    • 关闭 enable_rebalance 参数以确保 LOB 字段的正确处理:
      SHOW PARAMETERS LIKE 'enable_rebalance';
      ALTER SYSTEM SET enable_rebalance = false;
      

综上所述,OceanBase + Flink + OceanBase Binlog 服务 V4.0.1 可以实现数据同步,但需要关注上述提到的兼容性、高可用性、资源配置和注意事项。希望这些信息对您有所帮助。如有进一步的问题,请联系 OceanBase 的人工答疑以获取更多帮助。

更多信息请查看:

  1. 创建 OceanBase 社区版至 RocketMQ 的数据同步任务
  2. OceanBase Store RPC 访问 OceanBase 集群失败
  3. 如何处理乱码无法写入 OceanBase 数据库 V3.1.2 的问题
  4. 内存和磁盘要求
  5. 创建索引或约束
  6. 约束和索引操作
  7. 旁路导入介绍
  8. 0001 ~ 3999
1 个赞

可以实现的。可参考小助手的回复

1 个赞

FlinkCDC 是 Apache Flink 的一个连接器,用于从各种数据源(包括数据库)中捕获变更数据并实时处理。对于 OceanBase,FlinkCDC 可以用于捕获 OceanBase 的 Binlog 并实现实时数据同步。以下是关于使用 FlinkCDC 与 OceanBase 进行数据同步的一些关键点:

1. 版本兼容性

  • OceanBase 版本:确保 OceanBase 版本为 V4.2.2.1 或 V4.2.1.8。
  • Flink 版本:确保使用的 Flink 版本与 FlinkCDC 兼容。通常,FlinkCDC 支持最新的 Flink 版本。

2. 配置 FlinkCDC

  • 依赖项:在 Flink 项目中添加 FlinkCDC 的依赖项。例如,如果你使用的是 Maven,可以在 pom.xml 中添加以下依赖:

    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-cdc</artifactId>
        <version>2.2.0</version>
    </dependency>
    
  • 连接配置:配置 FlinkCDC 连接到 OceanBase 的参数。以下是一个示例配置:

    Properties properties = new Properties();
    properties.setProperty("username", "your_username");
    properties.setProperty("password", "your_password");
    properties.setProperty("server-timezone", "UTC");
    properties.setProperty("useSSL", "false");
    
    DebeziumSourceFunction<String> sourceFunction = DebeziumSourceFunction.<String>builder()
        .hostname("your_ob_host")
        .port(2881)
        .database("your_database")
        .tableList("your_table")
        .properties(properties)
        .deserializer(new JsonDebeziumDeserializationSchema())
        .build();
    

3. 数据同步流程

  • 启动 Flink 作业:使用 Flink 的 StreamExecutionEnvironment 启动作业,并将 FlinkCDC 作为数据源。
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> stream = env.addSource(sourceFunction);
    stream.print();
    env.execute("FlinkCDC to OceanBase");
    

4. 注意事项

  • 高可用性:虽然 FlinkCDC 本身提供了高可用性,但在生产环境中仍需确保整个系统的高可用性设计,例如通过多节点部署和故障切换机制来减少单点故障的影响。
  • 性能优化:根据实际需求调整 Flink 作业的并行度和资源分配,以优化性能。
  • 数据一致性:确保 FlinkCDC 捕获的数据与 OceanBase 中的数据保持一致,特别是在处理 UPDATEDELETE 操作时。

5. 示例代码

以下是一个完整的示例代码,展示了如何使用 FlinkCDC 从 OceanBase 捕获变更数据并打印出来:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

public class FlinkCDCExample {
    public static void main(String[] args) throws Exception {
        // 创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置连接参数
        Properties properties = new Properties();
        properties.setProperty("username", "your_username");
        properties.setProperty("password", "your_password");
        properties.setProperty("server-timezone", "UTC");
        properties.setProperty("useSSL", "false");

        // 创建 FlinkCDC 源
        DebeziumSourceFunction<String> sourceFunction = DebeziumSourceFunction.<String>builder()
            .hostname("your_ob_host")
            .port(2881)
            .database("your_database")
            .tableList("your_table")
            .properties(properties)
            .deserializer(new JsonDebeziumDeserializationSchema())
            .build();

        // 添加数据源
        DataStream<String> stream = env.addSource(sourceFunction);

        // 打印数据
        stream.print();

        // 启动作业
        env.execute("FlinkCDC to OceanBase");
    }
}

6. 参考资料

希望这些信息对您有所帮助。如有进一步的问题,请联系 OceanBase 的人工答疑以获取更多帮助。

具体信息可参考以下文档:

  1. 900.delete-a-zone.md
  2. 300.adjust-unit-number.md
  3. 500.merge-and-splitting-resource-pools.md
  4. 100.view-resource-pools.md
  5. 200.assign-a-resource-pool-to-a-tenant.md
  6. 600.delete-resource-pool.md
  7. 100.view-resource-unit-configuration.md
  8. 200.modify-the-configuration-of-a-resource-unit.md
  9. 1500.delete-resource-specification.md
  10. 100.resource-isolation-overview.md

(小助手的答复已结束,如未能解决您的问题,请继续提问并等待其他同学的回复,谢谢!)