obcdc返回的日志记录中如何提取事务ID

【 使用环境 】测试环境
【 其他组件 】obcdc
【 使用版本 】4.2.1.8
【问题描述】如题,oceanbase::logmessage::ILogRecord 中是否包含事务id?经观察 id() 方法好像不是事务id。

obcdc 配置

tenant_endpoint=xxx
tenant_user=yyy
tenant_password=zzz
drc_message_factory_binlog_record_type=LogRecordImpl

相关代码:

    int ret = OB_SUCCESS;
    ObCDCFactory cdc_factory;
    IObCDCInstance *obcdc_instance = NULL;

    std::time_t result = std::time(nullptr);
    char *begin_time = std::asctime(std::localtime(&result));

    if (OB_SUCCESS != create_obcdc_instance(cdc_factory, obcdc_instance)) {
        LOG("[ERROR] construct_obcdc_instance failed");
    } else if (NULL == obcdc_instance) {
        ret = OB_ERR_UNEXPECTED;
        LOG("[ERROR] obcdc_instance should not be null!");
    } else {
        LOG("begin init obcdc instance");
        if (OB_SUCCESS != init_obcdc_instance(*obcdc_instance, 1725611361)) {
            LOG("[ERROR] obcdc_instance init failed");
        } else {
            LOG("inited");
            while (OB_SUCCESS == ret) {
                Record *record = NULL;
                if (OB_SUCCESS != (ret = fetch_next_cdc_record(*obcdc_instance, &record))) {
                    if (OB_TIMEOUT == ret) {
                        ret = OB_SUCCESS;
                    } else {
                        LOG("[ERROR] fetch_next_cdc_record failed");
                    }
                } else if (OB_SUCCESS != (ret = handle_cdc_record(record))) {
                    LOG("[ERROR] handle_cdc_record failed");
                } else {
                    if (record != NULL) {
                        if (record->recordType() == oceanbase::logmessage::EINSERT) {
                            oceanbase::logmessage::ITableMeta *tableMeta = NULL;
                            if (OB_SUCCESS != record->getTableMeta(tableMeta)) {
                                LOG("[ERROR] get_table_meta failed");
                            } else {
                                unsigned int new_col_count = 0;
                                oceanbase::logmessage::BinLogBuf *new_cols = record->newCols(new_col_count);
                                size_t timemark_len;
                                vector<long> timemarks = record->getTimemark(timemark_len);
                                cout << "db: " << record->dbname() << endl
                                     << "table: " << tableMeta->getName() << endl
                                     << "timestamp: " << record->getTimestamp() * 1000000 + record->getRecordUsec()
                                     << endl
                                     << "id: " << record->id() << endl;
2 个赞

你可以看看 obcdc的配置文档

obcdc 配置项说明

https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000001052983

3 个赞

机器人回复?

2 个赞

不是 主要有相关的文档 给你推一下 看看是否能解决你的问题 你稍等 这边问问相关的同学 看一下你的问题

2 个赞

好的,谢谢。这些文档我都提前阅读过。

3 个赞

事务ID放在一个事务中的消息类型为EBEGIN的记录中,通过filterValues这个接口获取,是这个接口返回的数组的第二个元素,是一个字符串,格式是"TenantID_TxID";

3 个赞

好的,谢谢!

2 个赞

EINSERT/EUPDATE/EDELETE调用filterValues获得的返回数组中,第二个元素是 x_y_z 这种格式,与 EBEGIN 中的 TenantID_TxID 有什么联系吗?看起来 x 是 tenantId, z 是 txId,那y的含义是什么?

3 个赞

程序代码和配置文件方便发一下吗?

3 个赞

cdc原理还得参考一下

3 个赞