【 使用环境 】测试环境
【 OB or 其他组件 】
1、oblogclient master分支
2、oblogproxy-2.0.2-101000142024080121.el8.x86_64.rpm
【 使用版本 】
【问题描述】
oblogclient-sample 测试发现一个问题,就是测试用例的配置中如果指定了libobcdc.start_timestamp 如果指定了上次记录的位置,那么再次启动就会多消费一些日志数据。
【复现路径】
-
配置 oblogclient-sample/src/main/resources/application.properties
2.执行com.oceanbase.samples.LogProxyClientSample
3.记录每次消费的时间点或者位点 (message.getSafeTimestamp(),记录到kakfa
4.停止LogProxyClientSample
5.修改LogProxyClientSample从kafka读取位点,赋值
obReaderConfig.setStartTimestamp(
Long.parseLong(xxxxxx);
6.启动,打印日志发现重复数据。
问题1、请问这个重复数据如何解决,有什么办法可以精准的区分是不是重复数据?
问题2、clientConf.clientId(clientId) 中配置的clientId 有何作用,在哪里用到了吗?会通过clientId记录logproxy 消费日志到哪个位置了吗?
问题3、第二次从相同的位置消费,但是第二次checkpoint就会改变,请问这是为什么呢?
比如第一次是73815@1742522215 第二次是checkpoint:999999@1742521853。如果clientid改变了则不会出现这个问题。请问这个clientid可以随时改变吗?
问题4、oblogclient 停止之后需要等一会再启动,否则启动失败,那么这个等一会是多久呢,目测15秒以上?请问在哪里配置?
淇铭
#7
你测试的是cdc模式 还是binlog模式 你看看使用限制 oblogproxy2.02应该不支持ob421bp10 你尽量使用更高的版本
淇铭
#9
你看一下 我发给你的版本支持 应该是不支持4.2.1.10的版本 oblogproxy建议使用更高的版本
论坛小助手
#11
根据文档库中的信息,以下是针对您提出的问题的详细解答:
问题1:重复数据如何解决?有什么办法可以精准地区分是不是重复数据?
原因分析:
在CDC模式下,libobcdc.start_timestamp
指定了消费的起始时间点。如果设置为上次记录的位置(例如 message.getSafeTimestamp()
),可能会导致重复消费。这是因为 start_timestamp
并不完全精确到某个具体的日志条目,而是基于时间戳范围来定位日志。
解决方案:
-
去重逻辑: 在下游消费者中实现去重逻辑。可以通过唯一标识符(如主键或事务ID)来判断是否为重复数据。
-
调整时间戳: 将
start_timestamp
设置为上一次消费的结束时间点加1毫秒,以避免重复消费同一时间戳的数据。
-
参考官方建议: 根据 CDC模式/使用CDC模式 文档,确保
start_timestamp
的精度,并结合下游消费者的幂等性设计。
问题2:clientId
的作用是什么?是否会通过 clientId
记录 logproxy 消费日志到哪个位置?
clientId
的作用:
clientId
是 oblogclient 的唯一标识符,主要用于以下场景:
- 区分不同的客户端实例。
- 在 oblogproxy 中进行会话管理。
是否会记录消费位置:
根据文档,clientId
本身并不会直接记录消费的日志位点。消费位点的管理依赖于下游消费者的实现(例如 Kafka 的 checkpoint 机制)。clientId
的主要作用是帮助 oblogproxy 区分不同的客户端连接。
问题3:为什么第二次从相同的位置消费时,checkpoint 会发生改变?
原因分析:
-
时间戳范围的影响: 即使指定了相同的
start_timestamp
,由于时间戳范围内的日志可能发生变化(例如新的日志追加),checkpoint 的值可能会不同。
-
checkpoint 的生成机制: checkpoint 是基于当前消费的最新日志位置生成的,因此即使从相同的时间戳开始消费,最终的 checkpoint 值也可能因日志的变化而不同。
关于 clientId
改变后的行为:
如果 clientId
改变,则 oblogproxy 会将其视为一个新的客户端连接,从而可能导致 checkpoint 的重新计算。
建议:
- 确保
clientId
在同一次消费任务中保持不变。
- 如果需要切换
clientId
,建议重新初始化消费位点。
问题4:oblogclient 停止后需要等待多久再启动?在哪里配置?
原因分析:
oblogproxy 和 oblogclient 之间存在连接管理和状态同步的机制。如果 oblogclient 停止后立即重启,可能会因为以下原因导致失败:
- 连接未完全释放。
- 状态同步未完成。
等待时间:
根据实际测试,建议等待至少 15秒以上,以确保连接和状态同步完成。
配置方式:
目前没有明确的配置项可以直接控制等待时间。可以通过以下方式优化:
-
增加超时时间: 在 oblogproxy 的配置中,调整连接超时时间(例如
connection_timeout
)。
-
延迟启动: 在脚本中加入延迟逻辑,确保 oblogclient 停止后有足够的缓冲时间。
综合建议:
-
升级版本: 建议使用更高版本的 oblogproxy 和 oblogclient,以获得更好的兼容性和稳定性。
-
去重逻辑: 在下游消费者中实现去重逻辑,确保数据处理的幂等性。
-
固定
clientId
: 在同一次消费任务中保持 clientId
不变,避免因切换导致的异常行为。
-
延迟启动: 在 oblogclient 停止后,加入适当的延迟逻辑(例如 15秒),以确保连接和状态同步完成。
如果您仍有疑问,建议参考 OceanBase 日志代理服务 官方文档,或联系 OceanBase 技术支持团队获取进一步帮助。
详情请参考:
- 连接管理
- 通过 oblogproxy 进行增量日志代理服务
- 身份鉴别和认证
- 并发控制概述
- 代理高可用
- get_tenant 失败后重试无法退出
- 高可用 FAQ