请用如下cpp代码进行验证,具体使用方式:
- 将如下代码复制到到文件test.cpp中,test.cpp目录需要在/data/oceanbase/redolog/目录下;
- 使用g++工具进行编译,需要支持c++11,命令为: g++ -g -O2 test.cpp -lpthread -std=c++11,改名执行完成后,会生成a.out可执行文件;
- 执行ulimit -c unlimited;
- 执行./a.out > /tmp/test.log,如果出现运行失败,会生成core文件,core的预期在 ‘cat /proc/sys/kernel/core_pattern’ 下;
- 如果程序顺利运行结束,请联系支持同学。
#include <stdint.h>
#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <iostream>
#include <errno.h>
#include <pthread.h>
#include <string>
#include <assert.h>
using namespace std;
int64_t FILE_SIZE = 100ul * 1024 * 1024 * 1024; // 100G
int64_t WRITE_SIZE = 128; // 必须被4k整除
int64_t ALIGN_SIZE = 4096;
int log_write_flag = 0;
int log_read_flag = 0;
pthread_mutex_t mutex;
void lock()
{
pthread_mutex_lock(&mutex);
}
void unlock()
{
pthread_mutex_unlock(&mutex);
}
class Writer {
public:
void init(const char *file_name);
void append(int64_t write_offset);
void close();
int64_t curr_offset_;
int fd_;
std::string file_name_;
char *write_buf_;
};
void Writer::init(const char *file_name)
{
write_buf_ = static_cast<char*>(aligned_alloc(ALIGN_SIZE, ALIGN_SIZE));
curr_offset_ = 0;
file_name_ = file_name;
fd_ = ::open(file_name_.c_str(), log_write_flag, 0666);
fallocate(fd_, 0, 0, FILE_SIZE);
int64_t remined_size = FILE_SIZE;
}
void Writer::append(int64_t write_offset)
{
int64_t curr_write_size = WRITE_SIZE;
int64_t start_offset = write_offset / ALIGN_SIZE * ALIGN_SIZE;
int64_t valid_buff_len = (write_offset + curr_write_size) % ALIGN_SIZE;
if (valid_buff_len == 0) {
valid_buff_len = ALIGN_SIZE;
}
memset(write_buf_, 'x', valid_buff_len);
if (valid_buff_len != ALIGN_SIZE) {
memset(write_buf_ + valid_buff_len, 'y', 4096 - valid_buff_len);
}
printf("pwrite begin, write_offset:offset:%ld, start_offset:%ld, valid_buff_len:%ld\n", write_offset, start_offset, valid_buff_len);
usleep(10);
::pwrite(fd_, write_buf_, ALIGN_SIZE, start_offset);
printf("pwrite success , write_offset:%ld, start_offset:%ld, valid_buff_len:%ld\n", write_offset, start_offset, valid_buff_len);
lock();
curr_offset_ += curr_write_size;
unlock();
}
void Writer::close()
{
::close(fd_);
}
Writer writer;
void write_func()
{
while (1) {
lock();
int64_t curr_offset = writer.curr_offset_;
unlock();
if (curr_offset < FILE_SIZE - 4096) {
writer.append(curr_offset);
} else {
break;
}
}
writer.close();
}
class Reader {
public:
void init(const char *file_name);
void read(int64_t read_offset);
int fd_;
char *read_buf_;
char *right_buf_;
int64_t last_read_offset_;
std::string file_name_;
char *retry_buf_;
};
void Reader::init(const char *file_name)
{
read_buf_ = static_cast<char*>(aligned_alloc(ALIGN_SIZE, ALIGN_SIZE));
right_buf_ = static_cast<char*>(aligned_alloc(ALIGN_SIZE, ALIGN_SIZE));
retry_buf_ = static_cast<char*>(aligned_alloc(ALIGN_SIZE, ALIGN_SIZE));
memset(right_buf_, 'x', ALIGN_SIZE);
memset(retry_buf_, 'r', ALIGN_SIZE);
file_name_ = file_name;
last_read_offset_ = 0;
fd_ = ::open(file_name_.c_str(), log_read_flag);
}
void Reader::read(int64_t read_offset)
{
int64_t start_offset = (read_offset) / ALIGN_SIZE * ALIGN_SIZE;
int64_t valid_buff_len = read_offset % ALIGN_SIZE;
// memset(read_buf_, 'b', ALIGN_SIZE);
printf("read begin, read_offset:%ld, start_offset:%ld, valid_buff_len:%ld\n", read_offset, start_offset, valid_buff_len);
usleep(3);
::pread(fd_, read_buf_, ALIGN_SIZE, start_offset);
printf("read success, read_offset:%ld, start_offset:%ld, valid_buff_len:%ld\n", read_offset, start_offset, valid_buff_len);
if (0 != memcmp(read_buf_, right_buf_, valid_buff_len)) {
::pread(fd_, retry_buf_, ALIGN_SIZE, start_offset);
int i = 1;
while (0 != memcmp(retry_buf_, right_buf_, valid_buff_len)) {
{
printf("second read failed, offset:%ld, ptr:%p, retry_count: %d", start_offset, retry_buf_, i);
::pread(fd_, retry_buf_, ALIGN_SIZE, start_offset);
i++;
}
}
printf("memcmp failed, offset:%ld, read_buf:%p, retry_buf:%p", start_offset, read_buf_, retry_buf_);
assert(false);
} else {
last_read_offset_ += valid_buff_len;
}
}
Reader reader;
void* read_func(void *)
{
while (1) {
lock();
int64_t curr_offset = writer.curr_offset_;
unlock();
if (curr_offset > 4096) {
reader.read(curr_offset);
} else if (curr_offset > FILE_SIZE - 4096) {
break;
}
}
}
int main(int argc, char **argv)
{
pthread_mutex_init(&mutex, NULL);
std::string log_dir_str = "ob_unittest";
std::string rm_cmd = "rm -rf " + log_dir_str;
std::string mk_cmd = "mkdir " + log_dir_str;
system(rm_cmd.c_str());
system(mk_cmd.c_str());
std::string testfile = log_dir_str + "/" + "testfile";
log_read_flag = O_RDONLY | O_DIRECT | O_SYNC;
log_write_flag = O_RDWR | O_CREAT | O_DIRECT | O_SYNC;
writer.init(testfile.c_str());
reader.init(testfile.c_str());
pthread_t ntid;
pthread_create(&ntid, NULL, read_func, NULL);
write_func();
pthread_join(ntid, NULL);
return 0;
}