OMS 是否会限制增量同步中单次处理的数据量?

【 使用环境 】 测试环境
【 OB or 其他组件 】OMS
【 使用版本 】4.2.7_CE
【问题描述】 使用 OMS 配置 PGtoOB 的增量同步任务时, 当单次事务变更数据量超过一定数量,比如 100W 时, 数据未成功同步,且之后的任何变更都无法被同步。但是任务中的Store和Incr-Sync组件运行正常,只是当前拉取点位的时间不再变动,延迟时间持续增加。
日志中也只提示 :there is no data changed in the specified table of source postgresql-server.
OMS是否有参数或者配置来限制单次处理的行数或数据占用量?
PG 数据库为 15.3

【备注】基于 LLM 和开源文档 RAG 的论坛小助手已开放测试,在发帖时输入 [@论坛小助手] 即可召唤小助手,欢迎试用!

2 个赞

根据文档库中的信息,OMS 社区版确实存在对单次处理数据量的限制,尤其是在增量同步场景下。以下是针对您问题的具体分析和解答:


1. OMS 是否会限制增量同步中单次处理的数据量?

是的,OMS 社区版在增量同步过程中会对单次处理的数据量进行限制。这种限制主要体现在以下几个方面:

  • Kafka 消息大小限制
    根据文档片段 7 的描述,如果单次事务变更的数据量过大(例如超过 Kafka Server 的默认限制 message.max.bytes),可能会导致 RecordTooLargeException 错误。默认情况下,Kafka Client 的最大消息大小为 1GB (max.request.size),而 Kafka Server 的默认限制为 1MB (message.max.bytes)。如果您的单次事务变更数据量超过这些限制,可能会导致数据无法成功同步。

  • 内存与并发限制
    文档片段 10 提到,OMS 的性能调优参数中涉及源端和目标端的并发配置 (source.workerNumsink.workerNum) 以及 JVM 内存配置 (coordinator.connectorJvmParam)。如果单次事务变更的数据量过大,可能会超出 OMS 的内存或线程处理能力,从而导致任务停滞。

  • 事务大小限制
    虽然 OceanBase 数据库本身对大事务没有明确的大小限制(见文档片段 6),但在实际使用中,过大的事务可能会导致内存使用激增,进而影响 OMS 的正常运行。


2. 为什么会出现“there is no data changed in the specified table of source postgresql-server”的提示?

根据您的描述,任务中的 Store 和 Incr-Sync 组件运行正常,但拉取点位的时间不再变动,延迟时间持续增加。这可能是因为以下原因:

  • Kafka 消息过大导致丢弃
    如果单次事务变更的数据量超过了 Kafka 的限制,OMS 可能会丢弃该批次的数据,并继续尝试从下一个点位拉取数据。此时,日志中可能会显示“no data changed”的提示。

  • 任务状态异常
    如果任务因数据量过大而进入异常状态(例如暂停或失败),OMS 可能会停止更新拉取点位的时间,导致延迟持续增加。


3. 如何解决此问题?

以下是针对您问题的解决方案:

(1) 调整 Kafka 参数

  • 增加 Kafka Client 的最大消息大小:
    在 OMS 配置文件中,设置 max.request.size 参数,例如:
    max.request.size=209715200  # 单位为字节,此处设置为 200MB
    
  • 增加 Kafka Server 的最大消息大小:
    修改 Kafka Server 的配置文件,设置 message.max.bytes 参数,例如:
    message.max.bytes=209715200  # 单位为字节,此处设置为 200MB
    
    修改完成后需要重启 Kafka Server。

(2) 优化事务大小

  • 在 PostgreSQL 源端,尽量避免单次事务包含过多的数据变更。可以通过分批提交事务来减少单次变更的数据量。
  • 在 OMS 配置中,启用 skipErrorCode 参数跳过特定错误码(如 RecordTooLargeException)。例如:
    skipErrorCode: [1]
    

(3) 调整 OMS 性能参数

  • 增加源端和目标端的并发数:
    根据文档片段 10 的建议,可以适当增加 source.workerNumsink.workerNum 的值。例如:
    source.workerNum: 16
    sink.workerNum: 16
    
  • 增加 JVM 内存配置:
    根据文档片段 10 的建议,调整 -Xms-Xmx-Xmn 参数。例如:
    coordinator.connectorJvmParam: "-Xms16g -Xmx16g -Xmn12g"
    

(4) 监控任务状态

  • 在 OMS 的 增量同步页面,实时查看任务的延迟时间、同步流量和 RPS 等指标(见文档片段 4)。
  • 如果任务因数据量过大而进入异常状态,可以尝试暂停并恢复任务(见文档片段 5 和 9)。

4. 总结

OMS 社区版确实会对增量同步中单次处理的数据量进行限制,主要体现在 Kafka 消息大小、内存使用和事务大小等方面。针对您的问题,建议从以下几个方面入手:

  1. 调整 Kafka 参数以支持更大的消息大小;
  2. 优化 PostgreSQL 源端的事务大小;
  3. 调整 OMS 的性能参数以提升处理能力;
  4. 实时监控任务状态并及时排查异常。

如果上述方法仍无法解决问题,建议联系 OceanBase 的技术支持团队以获取进一步帮助。

检索到的文档如下:

  1. 创建 OceanBase 社区版至 RocketMQ 的数据同步任务
  2. 查看和修改同步对象
  3. 批量操作数据同步任务
  4. 查看数据同步任务的详情
  5. 启动和暂停数据同步任务
  6. 事务相关规范
  7. OMS 社区版错误码
  8. 迁移 HBase 数据库的数据至 OBKV
  9. OMS 社区版问题排查手册
2 个赞

组件进程限制数量为默认的平稳选项

1 个赞

根据文档库中的信息,如果您希望设置 OceanBase 组件进程限制数量为默认的平稳选项,可以参考以下内容:


1. 进程限制的相关配置

OceanBase 数据库的进程涉及多个资源限制参数,包括线程最大栈空间大小(Stack)、最大文件句柄数(Open Files)和 core 文件大小(Core File Size)。这些限制可以通过以下两种方式配置(见文档片段 1):

  • 会话级别修改
    在启动时通过命令行临时修改。例如:

    ulimit -n 65535  # 设置最大文件句柄数
    ulimit -s 65536  # 设置线程最大栈空间大小
    ulimit -c unlimited  # 设置 core 文件大小为不限制
    
  • 全局级别修改
    通过配置文件 /etc/security/limits.conf 永久生效。例如:

    oceanbase soft nofile 65535
    oceanbase hard nofile 65535
    oceanbase soft stack 65536
    oceanbase hard stack 65536
    oceanbase soft core unlimited
    oceanbase hard core unlimited
    

2. 默认的平稳选项

为了确保 OceanBase 数据库运行平稳,默认的资源限制建议如下:

  • 最大文件句柄数
    默认值为 65535,适用于大多数生产环境。如果您的系统资源充足,可以适当增加该值。

  • 线程最大栈空间大小
    默认值为 65536 KB(即 64 MB),适用于大多数场景。如果您的系统中线程数较多,可以适当降低该值以节省内存。

  • core 文件大小
    默认值为 unlimited,允许生成完整的 core 文件以便于调试。如果不需要调试功能,可以将其设置为 0


3. 总结

为了设置组件进程限制数量为默认的平稳选项,建议按照以下配置进行调整:

# 会话级别修改
ulimit -n 65535  # 最大文件句柄数
ulimit -s 65536  # 线程最大栈空间大小
ulimit -c unlimited  # core 文件大小

# 全局级别修改(写入 /etc/security/limits.conf)
oceanbase soft nofile 65535
oceanbase hard nofile 65535
oceanbase soft stack 65536
oceanbase hard stack 65536
oceanbase soft core unlimited
oceanbase hard core unlimited

如果您的系统环境特殊(例如资源受限或高并发场景),可以根据实际需求调整上述参数值。


如果您需要进一步优化或有其他疑问,请提供更多背景信息,我将尽力为您解答!

详情请参考:

  1. (可选)配置 limits.conf
  2. writing_throttling_trigger_percentage
  3. 设置并行执行参数
  4. writing_throttling_maximum_duration
  5. minor_merge_concurrency
  6. CREATE MATERIALIZED VIEW LOG
  7. merge_thread_count
  8. OceanBase 数据库多租户线程
  9. location_refresh_thread_count

(小助手的答复已结束,如未能解决您的问题,请继续提问并等待其他同学的回复,谢谢!)

1 个赞

你截图组件监控 看看

你是数据迁移 增量同步的时候 有问题是么?


当变更数据量达到一定值的时候,比如 100W 行。
增量拉取和增量同步组件的拉取点位都不会发生改变。
但是PG数据已经修改了,且事务已经提交,PG 中查询数据已变更和插入完成。
而且之后的所有变更,不管数据量,都不会同步到 OB 中。组件的位点时间一直不变。

1 个赞
检查是否有相关参数设置,比如:

    max_batch_row_count:单批次最大处理行数

    max_batch_byte_size:单批次最大处理字节数

    max_memory_limit:任务最大内存占用

如果不支持通过 UI 设置,可查看 OMS 配置文件,或联系开发团队查看内部配置/源码
  1. PostgreSQL 解码器处理异常

你提到日志中提示无数据变化,这可能意味着:

PostgreSQL 插件(如 pgoutput 或 test_decoding)未能正确解码该大事务

或者 OMS 对解码后的大事务未做分片/切分处理

建议:

查看 PostgreSQL 的 wal_level,确保是 logical

查看 PostgreSQL 日志中是否有长事务或逻辑复制槽错误

检查是否使用了支持大事务处理的逻辑解码插件(建议使用 pgoutput)
  1. WAL 槽(replication slot)异常或阻塞

大事务未被及时处理可能导致:

WAL 槽持续占用空间,甚至出现堆积,阻塞新的数据变更被解码

建议:

查看当前逻辑复制槽的状态:

SELECT * FROM pg_replication_slots;

查看 pg_stat_replication 是否显示延迟持续增加

2 个赞

还得按照官网看看有没有相关配置信息

你可以看看下面的文档 是不是可以调整一下 看看是不是组件性能问题导致的

如何设置限流限速

https://www.oceanbase.com/docs/community-oms-cn-1000000001880694

全量/增量数据迁移性能调优

https://www.oceanbase.com/docs/community-oms-cn-1000000001880793

发现再过了 50 分钟后,数据同步到了 OB 中,增加增量同步组件配置的读写线程和 JVM 内存后,再次重试相同数据量,发现还是要 50 分钟后才同步成功。
下面是相关日志。
congo_20250512.log (298.3 KB)
store_20250512.log (38.9 KB)
store_connector_20250512.log (148.8 KB)
metrics_20250512.log (1.1 MB)
connector_sink_msg_20250512.log (75.9 KB)
incr-sync_connector_20250512.log (160.0 KB)

查询 Metrics 信息

执行 ./connector_utils.sh metrics 命令,即可查询 Metrics 信息。

可以调整一下分片数 看看
每个分片记录数 source.sliceBatchSize

默认600

对于大表一般10000基本就可以满足了,太大会消耗内存很多。

通过这个监控一下jvm 看看是否是jvm不足
ps -ef|grep “组件ID”

/opt/alibaba/java/bin/jstat -gcutil {pid} 1s


企业微信截图_c99c0114-6741-41f5-b959-6a7f718fc70a

企业微信截图_d6de488b-94a5-48b4-852a-514d1510382c

source.sliceBatchSize 在哪里调整?

FGC不断增加则表示需要增加jvm内存
ready batch 数量,如果 > 10 则说明目的端写入存在瓶颈,此时如果线程数量小于 64 可以尝试 sink.workerNum 加到 64 及以上(这个是否生效取决于机器的配置和目的端的能力)。
source.sliceBatchSize配置截图的地方

1 个赞


好像没有你提供截图的的那个配置按钮。

就在更新那里 点击更新 可以看到

这是点更新后所有的配置内容,未发现关键字,sliceBatchSize 未找到,是直接添加到 source 里么?
workernum 有找到:
“root”:{

“condition”:{

“blackCondition”:

string"[{“all”:false,“sub”:[{“func”:“fn”,“name”:“drc_txn*”},{“func”:“fn”,“name”:“DRC_TXN*”},{“func”:“fn”,“name”:“delay_delete_"},{“func”:“fn”,“name”:"DELAY_DELETE_”}],“func”:“fn”,“name”:“public”,“map”:“er_nsh_payslip”}]"

“whiteCondition”:

string"[{“all”:false,“sub”:[{“name”:“payslip_template_style_rel”,“map”:“payslip_template_style_rel”},{“name”:“payslip_summary_config”,“map”:“payslip_summary_config”},{“name”:“payslip_template_config”,“map”:“payslip_template_config”},{“name”:“payslip_info_solution_batch_rel”,“map”:“payslip_info_solution_batch_rel”},{“name”:“payslip_template”,“map”:“payslip_template”},{“name”:“payslip_wage_item”,“map”:“payslip_wage_item”},{“name”:“payslip_template_config_solution_rel”,“map”:“payslip_template_config_solution_rel”},{“name”:“payslip_employee_info”,“map”:“payslip_employee_info”},{“name”:“payslip_info”,“map”:“payslip_info”},{“name”:“payslip_wage_group”,“map”:“payslip_wage_group”},{“name”:“payslip_wage_adapt_item”,“map”:“payslip_wage_adapt_item”},{“name”:“ob_person_bank_information”,“map”:“ob_person_bank_information”},{“name”:“ob_per_national_id”,“map”:“ob_per_national_id”},{“name”:“ob_payslip_employee_info_wage_item”,“map”:“ob_payslip_employee_info_wage_item”},{“name”:“ob_per_phone”,“map”:“ob_per_phone”},{“name”:“ob_per_personal”,“map”:“ob_per_personal”},{“name”:“ob_hr_employee”,“map”:“ob_hr_employee”},{“name”:“ob_per_person”,“map”:“ob_per_person”},{“name”:“ob_per_email”,“map”:“ob_per_email”},{“name”:“hr_department”,“map”:“hr_department”},{“name”:“hr_employee_eapp_external”,“map”:“hr_employee_eapp_external”},{“name”:“emp_job_eapp_external”,“map”:“emp_job_eapp_external”},{“name”:“emp_job”,“map”:“emp_job”}],“name”:“public”,“map”:“er_nsh_payslip”}]"

}

“coordinator”:{

“allowRecordTypes”:

string"DELETE,INSERT,UPDATE"

“connectorJvmParam”:

string"-Xms2048m -Xmx2048m -Xmn1024m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSClassUnloadingEnabled"

“enableActiveReportTask”:

boolfalse

“enableMetricReportTask”:

boolfalse

“enableOmsConnectorV2Report”:

booltrue

“sinkType”:

string"OB_MYSQL"

“sourceType”:

string"POSTGRESQL"

“taskIdentity”:

string"np_6qevtq34489s"

“timezone”:

string"+08:00"

}

“sink”:{

“boosterClass”:

string"com.oceanbase.oms.connector.jdbc.sink.obmysql.OBMySQLJDBCSinkBooster"

“charsetReplaceMap”:{}

“dbVersion”:

string"4.3.5.0"

“enableNoUniqueConstraintTableReplicate”:

booltrue

“endpointId”:

string"e_6j2o993zyjy8"

“ignoreRedunantColumnsReplicate”:

boolfalse

“insertMode”:

string"INSERT"

“isPreLoadAllTableSchema”:

boolfalse

“jar”:

string"jdbc-sink-ob-mysql.jar"

“jdbcUrl”:

string"jdbc:mysql://10.21.11.12:2883?useUnicode=true&allowMultiQueries=true&socketTimeout=50000&characterEncoding=utf8&readOnlyPropagatesToServer=false"

“password”:

string"******"

“timezone”:

string"+08:00"

“type”:

string"JDBC_SINK_OB_MYSQL"

“username”:

string"root"

“workerNum”:

int8

}

“source”:{

“boosterClass”:

string"com.oceanbase.oms.connector.source.store.MultiStoreSourceBooster"

“checkpoint”:{

“POSTGRESQL_np_6qevtq34489s_6qevu95mpqio-1-0”:

string"1747042038"

}

“clients”:[

0:{

“clientId”:

string"POSTGRESQL_np_6qevtq34489s_6qevu95mpqio-1-0"

“clusterManagerUrl”:

string"http://10.21.11.12:8088"

“subtopic”:

string"POSTGRESQL_np_6qevtq34489s_6qevu95mpqio-1-0"

}

]

“dbType”:

string"POSTGRESQL"

“dbVersion”:

float15.3

“ignoreDdl”:

booltrue

“jar”:

string"connector-source-store.jar"

“password”:

string"******"

“timezone”:

string"+08:00"

“type”:

string"STORE_SOURCE"

“username”:

string"RM_oms"

“workerNum”:

int8

}

}

不好意思 说错了 这个是全量组件的参数 从你后来metrics信息来看是目标端有积压

按照我后来发的 检查一下

1 个赞