oblogclient 消费数据出现重复问题

【 使用环境 】测试环境
【 OB or 其他组件 】
1、oblogclient master分支
2、oblogproxy-2.0.2-101000142024080121.el8.x86_64.rpm
【 使用版本 】
【问题描述】
oblogclient-sample 测试发现一个问题,就是测试用例的配置中如果指定了libobcdc.start_timestamp 如果指定了上次记录的位置,那么再次启动就会多消费一些日志数据。

【复现路径】

  1. 配置 oblogclient-sample/src/main/resources/application.properties
    2.执行com.oceanbase.samples.LogProxyClientSample
    3.记录每次消费的时间点或者位点 (message.getSafeTimestamp(),记录到kakfa
    4.停止LogProxyClientSample
    5.修改LogProxyClientSample从kafka读取位点,赋值

    obReaderConfig.setStartTimestamp(
             Long.parseLong(xxxxxx);
    

6.启动,打印日志发现重复数据。

问题1、请问这个重复数据如何解决,有什么办法可以精准的区分是不是重复数据?
问题2、clientConf.clientId(clientId) 中配置的clientId 有何作用,在哪里用到了吗?会通过clientId记录logproxy 消费日志到哪个位置了吗?
问题3、第二次从相同的位置消费,但是第二次checkpoint就会改变,请问这是为什么呢?
比如第一次是73815@1742522215 第二次是checkpoint:999999@1742521853。如果clientid改变了则不会出现这个问题。请问这个clientid可以随时改变吗?
问题4、oblogclient 停止之后需要等一会再启动,否则启动失败,那么这个等一会是多久呢,目测15秒以上?请问在哪里配置?

ob的版本号 发一下

ob-社区版 4.2.10

你测试的是cdc模式 还是binlog模式 你看看使用限制 oblogproxy2.02应该不支持ob421bp10 你尽量使用更高的版本

4.2.1.10 刚才说错了。应该是支持的。

你看一下 我发给你的版本支持 应该是不支持4.2.1.10的版本 oblogproxy建议使用更高的版本

@论坛小助手

根据文档库中的信息,以下是针对您提出的问题的详细解答:


问题1:重复数据如何解决?有什么办法可以精准地区分是不是重复数据?

原因分析:
在CDC模式下,libobcdc.start_timestamp 指定了消费的起始时间点。如果设置为上次记录的位置(例如 message.getSafeTimestamp()),可能会导致重复消费。这是因为 start_timestamp 并不完全精确到某个具体的日志条目,而是基于时间戳范围来定位日志。

解决方案:

  1. 去重逻辑: 在下游消费者中实现去重逻辑。可以通过唯一标识符(如主键或事务ID)来判断是否为重复数据。
  2. 调整时间戳:start_timestamp 设置为上一次消费的结束时间点加1毫秒,以避免重复消费同一时间戳的数据。
  3. 参考官方建议: 根据 CDC模式/使用CDC模式 文档,确保 start_timestamp 的精度,并结合下游消费者的幂等性设计。

问题2:clientId 的作用是什么?是否会通过 clientId 记录 logproxy 消费日志到哪个位置?

clientId 的作用:
clientId 是 oblogclient 的唯一标识符,主要用于以下场景:

  • 区分不同的客户端实例。
  • 在 oblogproxy 中进行会话管理。

是否会记录消费位置:
根据文档,clientId 本身并不会直接记录消费的日志位点。消费位点的管理依赖于下游消费者的实现(例如 Kafka 的 checkpoint 机制)。clientId 的主要作用是帮助 oblogproxy 区分不同的客户端连接。


问题3:为什么第二次从相同的位置消费时,checkpoint 会发生改变?

原因分析:

  1. 时间戳范围的影响: 即使指定了相同的 start_timestamp,由于时间戳范围内的日志可能发生变化(例如新的日志追加),checkpoint 的值可能会不同。
  2. checkpoint 的生成机制: checkpoint 是基于当前消费的最新日志位置生成的,因此即使从相同的时间戳开始消费,最终的 checkpoint 值也可能因日志的变化而不同。

关于 clientId 改变后的行为:
如果 clientId 改变,则 oblogproxy 会将其视为一个新的客户端连接,从而可能导致 checkpoint 的重新计算。

建议:

  • 确保 clientId 在同一次消费任务中保持不变。
  • 如果需要切换 clientId,建议重新初始化消费位点。

问题4:oblogclient 停止后需要等待多久再启动?在哪里配置?

原因分析:
oblogproxy 和 oblogclient 之间存在连接管理和状态同步的机制。如果 oblogclient 停止后立即重启,可能会因为以下原因导致失败:

  1. 连接未完全释放。
  2. 状态同步未完成。

等待时间:
根据实际测试,建议等待至少 15秒以上,以确保连接和状态同步完成。

配置方式:
目前没有明确的配置项可以直接控制等待时间。可以通过以下方式优化:

  1. 增加超时时间: 在 oblogproxy 的配置中,调整连接超时时间(例如 connection_timeout)。
  2. 延迟启动: 在脚本中加入延迟逻辑,确保 oblogclient 停止后有足够的缓冲时间。

综合建议:

  1. 升级版本: 建议使用更高版本的 oblogproxy 和 oblogclient,以获得更好的兼容性和稳定性。
  2. 去重逻辑: 在下游消费者中实现去重逻辑,确保数据处理的幂等性。
  3. 固定 clientId 在同一次消费任务中保持 clientId 不变,避免因切换导致的异常行为。
  4. 延迟启动: 在 oblogclient 停止后,加入适当的延迟逻辑(例如 15秒),以确保连接和状态同步完成。

如果您仍有疑问,建议参考 OceanBase 日志代理服务 官方文档,或联系 OceanBase 技术支持团队获取进一步帮助。

详情请参考:

  1. 连接管理
  2. 通过 oblogproxy 进行增量日志代理服务
  3. 身份鉴别和认证
  4. 并发控制概述
  5. 代理高可用
  6. get_tenant 失败后重试无法退出
  7. 高可用 FAQ