简述
LevelDB第五篇笔记,主要解析dbformat.h/cc两个文件,这两个文件主要是定义了一些数据库在存储方面使用到的一些结构。
依旧采取先注释再分析重点的方式。
dbformat
1 |
|
头文件中一些方法的实现都在对应的源文件中,如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141// dbformat.cc
namespace leveldb {
// 将sequence和ValueType整合到一个无符号64位整型中
static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek);
return (seq << 8) | t;
}
// 将key追加到result中
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}
// 该方法就是把ParsedInternalKey打印一下,用来debug
std::string ParsedInternalKey::DebugString() const {
char buf[50];
snprintf(buf, sizeof(buf), "' @ %llu : %d", (unsigned long long)sequence,
int(type));
std::string result = "'";
// EscapeString方法将返回一个便于人阅读的格式的string
result += EscapeString(user_key.ToString());
result += buf;
return result;
}
// 尝试把InternalKey转成ParsedInternalKey再调输出debug信息
// 转换失败的时候,则会在result里添加bad信息,并直接调用EscapeString方法
std::string InternalKey::DebugString() const {
std::string result;
ParsedInternalKey parsed;
if (ParseInternalKey(rep_, &parsed)) {
result = parsed.DebugString();
} else {
result = "(bad)";
result.append(EscapeString(rep_));
}
return result;
}
const char* InternalKeyComparator::Name() const {
return "leveldb.InternalKeyComparator";
}
int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// 按照user_key升序,sequence_number和value_type降序,其中user_key升序使用用户提供的比较器
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}
// 接下来两个方法到用到的地方再看下
void InternalKeyComparator::FindShortestSeparator(std::string* start,
const Slice& limit) const {
// 尝试缩短key的用户部分
Slice user_start = ExtractUserKey(*start);
Slice user_limit = ExtractUserKey(limit);
std::string tmp(user_start.data(), user_start.size());
user_comparator_->FindShortestSeparator(&tmp, user_limit);
if (tmp.size() < user_start.size() &&
user_comparator_->Compare(user_start, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp,
PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
assert(this->Compare(*start, tmp) < 0);
assert(this->Compare(tmp, limit) < 0);
start->swap(tmp);
}
}
void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
Slice user_key = ExtractUserKey(*key);
std::string tmp(user_key.data(), user_key.size());
user_comparator_->FindShortSuccessor(&tmp);
if (tmp.size() < user_key.size() &&
user_comparator_->Compare(user_key, tmp) < 0) {
// user_key逻辑上变大而实际上变小了。
// 将尽可能早的数字添加到缩短的user_key上。【TODO】
PutFixed64(&tmp,
PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
assert(this->Compare(*key, tmp) < 0);
key->swap(tmp);
}
}
const char* InternalFilterPolicy::Name() const { return user_policy_->Name(); }
void InternalFilterPolicy::CreateFilter(const Slice* keys, int n,
std::string* dst) const {
// 我们依赖table.cc中的代码不会在意我们修改keys[]这样的一个实时。【TODO】
Slice* mkey = const_cast<Slice*>(keys);
for (int i = 0; i < n; i++) {
mkey[i] = ExtractUserKey(keys[i]);
// TODO(sanjay): Suppress dups?
}
user_policy_->CreateFilter(keys, n, dst);
}
bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const {
return user_policy_->KeyMayMatch(ExtractUserKey(key), f);
}
// 根据user_key和sequence_number构造出LookupKey
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size();
size_t needed = usize + 13; // 一个保守估计
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
kstart_ = dst;
memcpy(dst, user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;
}
} // namespace leveldb
Comparator
简单看一下比较器的头文件和实现文件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51//comparator.h
namespace leveldb {
class Slice;
// 比较器提供sstable或者database中key的排序方法。
// 一个比较器的实现必须是线程安全的,因为leveldb可能会同时从多个线程调用比较器的方法。
class LEVELDB_EXPORT Comparator {
public:
virtual ~Comparator();
// 三种比较结果:
// < 0 iff "a" < "b",
// == 0 iff "a" == "b",
// > 0 iff "a" > "b"
virtual int Compare(const Slice& a, const Slice& b) const = 0;
// 比较器的名字,用来检查比较器能不能match上(比如数据库创建时使用的名字和后来使用的不一致)
//
// 如果比较器实的实现改变了并且会造成任何多个key的相关顺序改变,那客户端就应该换一个新名字。
//
// 以'level.'开头的名字被保留,客户端不要使用
virtual const char* Name() const = 0;
// 高级方法:它们用于减少索引块等内部数据结构的空间需求。
// 如果*start < limit,改变*start变成一个短字符串[start,limit)
// 简单的比较器实现可能不会改变*start并直接返回
// 比如,一个什么也不做的实现也是可以的
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const = 0;
// 改变*key变成一个更短但>=*key的短字符串
// 简单的比较器实现可能不会改变*key并直接返回
// 比如,一个什么也不做的实现也是可以的
virtual void FindShortSuccessor(std::string* key) const = 0;
};
// 返回一个内置的使用字典序的比较器
// 返回结果仍然是这个模块的属性,不能删除。
LEVELDB_EXPORT const Comparator* BytewiseComparator();
} // namespace leveldb
其中几个方法的实现如下,其实实现文件是Comparator类的一个内置子类,就是上面头文件最后出现的BytewiseComparator:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
namespace leveldb {
Comparator::~Comparator() {}
namespace {
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() {}
//比较器名字,可见果然用了level.做前缀
virtual const char* Name() const { return "leveldb.BytewiseComparator"; }
//比较
virtual int Compare(const Slice& a, const Slice& b) const {
return a.compare(b);
}
//获得大于start但小于limit的最小值
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const {
// 找到公共前缀的长度
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
}
if (diff_index >= min_length) {
// 如果一个字符串是另一个字符串的前缀,则不做改变
} else {
//很奇特的一段逻辑,将较小的字符串的最后一位+1,并去掉后面的字符
//这时才回头看上面dbformat.cc文件中的那句注释“user_key逻辑上变大而实际上变小了。”,就理解意思了。
//逻辑上变大:因为字符串最后的字母+1了,比如'b'变成了'a',那么在字典序中就靠前了一些,变大了一些
//实际上变小:字符串长度变短了
//比如传入的参数是"abcg"和"abe",那运行后"abe"还是"abe",但"abcd"已经变成了"abd"
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
}
}
}
// 找到第一个可以递增的字符,递增它,然后去掉后面的字符
// 获得比start大的最小值
virtual void FindShortSuccessor(std::string* key) const {
// 找到第一个可以递增的字符
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i + 1);
return;
}
}
// *key 是0xff,保留。
}
};
} // namespace
//static单例
const Comparator* BytewiseComparator() {
static NoDestructor<BytewiseComparatorImpl> singleton;
return singleton.get();
}
} // namespace leveldb