leveldb源码学习5-dbformat和comparator

简述

LevelDB第五篇笔记,主要解析dbformat.h/cc两个文件,这两个文件主要是定义了一些数据库在存储方面使用到的一些结构。

依旧采取先注释再分析重点的方式。

dbformat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
#ifndef STORAGE_LEVELDB_DB_DBFORMAT_H_
#define STORAGE_LEVELDB_DB_DBFORMAT_H_

#include <stdio.h>

#include "leveldb/comparator.h"
#include "leveldb/db.h"
#include "leveldb/filter_policy.h"
#include "leveldb/slice.h"
#include "leveldb/table_builder.h"
#include "util/coding.h"
#include "util/logging.h"

namespace leveldb {


// config域名里定义了一组常量,用户可能希望通过选项设置这些参数。
namespace config {
// LevelDB的级数
static const int kNumLevels = 7;

// 当命中这么多文件时,Level-0开始压缩
static const int kL0_CompactionTrigger = 4;

// Level-0文件个数的软限制,到达这个限制时LevelDB会降低写速度。
static const int kL0_SlowdownWritesTrigger = 8;

// Level-0文件个数的最大限制,到达这个限制时会停止写。
static const int kL0_StopWritesTrigger = 12;

// 如果一个新压缩的memtable没有产生重叠,就将其推到的最大级别。
// 我们尝试将级别提升到2来避免相对昂贵的level 0 => level 1压缩,并避免一些昂贵的manifile操作。
// 我们并不会一直推到最大级别,因为如果相同的key空间被覆盖的话,就会产生大量的磁盘空间浪费。
static const int kMaxMemCompactLevel = 2;

// 在迭代期间读取的数据样本之间的字节数的近似差距,1MB
static const int kReadBytesPeriod = 1048576;

} // namespace config

class InternalKey;

// 该枚举类被编码成InternalKey的最后组成部分。
// 不要改变该枚举类的值,它们被嵌入到磁盘上的数据结构中。
enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1 };

// kValueTypeForSeek定义了当构建一个ParsedInternalKey对象时应该被传入用来寻找一个特定sequence number的ValueType值。
// 因为我们会递增排列sequence number,并且ValueType会被嵌入到intervalkey中的sequence number的低8位中,所以我们需要最高编号的ValueType,而不是最低的。
static const ValueType kValueTypeForSeek = kTypeValue;

typedef uint64_t SequenceNumber;

// 我们留出了低位8bit为空,以便sequence和valuetype能够包装在一起成为一个64bits。
static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);

//解析过的InternalKey,即包含user_key、SequenceNumber和ValueType三个字段
struct ParsedInternalKey {
Slice user_key;
SequenceNumber sequence;
ValueType type;

ParsedInternalKey() {} // 出于速率考虑,故意没有初始化
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t)
: user_key(u), sequence(seq), type(t) {}
std::string DebugString() const;
};

// 返回key编码后的长度,就是user_key的长度加上8个bytes(sequence和ValuType一共64bits)
inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
return key.user_key.size() + 8;
}

// 将“key”的序列化结果附加到*result。
void AppendInternalKey(std::string* result, const ParsedInternalKey& key);

// 尝试将internal_key转成ParsedInternalKey,成功返回true,并将结果存在result中。
// 若失败,返回false,result处于未定义状态。
bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result);

// 返回InternalKey中的user_key部分
// 一个InternalKey其实就是user_key+sequence+valtype,后两者一共占8个byte
inline Slice ExtractUserKey(const Slice& internal_key) {
assert(internal_key.size() >= 8);
return Slice(internal_key.data(), internal_key.size() - 8);
}

// 一个供InternalKey使用的比较器,该比较器使用一个特定的比较器来比较userkey部分并通过减少序sequenceNumber来断开连接。
class InternalKeyComparator : public Comparator {
private:
const Comparator* user_comparator_;

public:
explicit InternalKeyComparator(const Comparator* c) : user_comparator_(c) {}
virtual const char* Name() const;
virtual int Compare(const Slice& a, const Slice& b) const;
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const;
virtual void FindShortSuccessor(std::string* key) const;

const Comparator* user_comparator() const { return user_comparator_; }

int Compare(const InternalKey& a, const InternalKey& b) const;
};

// 将InternalKey转换成Userkeys的过滤策略包装器
class InternalFilterPolicy : public FilterPolicy {
private:
const FilterPolicy* const user_policy_;

public:
explicit InternalFilterPolicy(const FilterPolicy* p) : user_policy_(p) {}
virtual const char* Name() const;
virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const;
virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const;
};

// LevelDB应该将internal-key封装在InternalKey中而不是原生的string中,这样我们才不至于错误地使用字符串比较而不是InternalKeyComparator。
class InternalKey {
private:
std::string rep_;

public:
InternalKey() {} // rep_留空来表示无效

// 构造InternalKey,最终rep_的结构为 user_key + ( s<<8 | t)
InternalKey(const Slice& user_key, SequenceNumber s, ValueType t) {
AppendInternalKey(&rep_, ParsedInternalKey(user_key, s, t));
}

// 把s赋值给rep_
void DecodeFrom(const Slice& s) { rep_.assign(s.data(), s.size()); }
//就是返回rep_
Slice Encode() const {
assert(!rep_.empty());
return rep_;
}

// 调用ExtractUserKey方法提取rep_中的userkey部分,其实就是抛弃后面8个bye
Slice user_key() const { return ExtractUserKey(rep_); }

// 其实就是reset方法吧
void SetFrom(const ParsedInternalKey& p) {
rep_.clear();
AppendInternalKey(&rep_, p);
}

void Clear() { rep_.clear(); }

std::string DebugString() const;
};

// 其实就是比较两个string
inline int InternalKeyComparator::Compare(const InternalKey& a,
const InternalKey& b) const {
return Compare(a.Encode(), b.Encode());
}

// 尝试把InternalKey转换成ParsedInternalKey,感觉就是InternalKey类构造函数的逆操作
inline bool ParseInternalKey(const Slice& internal_key,
ParsedInternalKey* result) {
const size_t n = internal_key.size();
if (n < 8) return false;
uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
unsigned char c = num & 0xff;
result->sequence = num >> 8;
result->type = static_cast<ValueType>(c);
result->user_key = Slice(internal_key.data(), n - 8);
return (c <= static_cast<unsigned char>(kTypeValue));
}

// A helper class useful for DBImpl::Get()
// db内部在为查找memtable/sstable方便,包装使用的key结构,保存有userkey与SequnceNumber/ValueType dump在内存的数据。
// 对memtable 进行lookup时使用 [start,end], 对sstable lookup时使用[kstart, end]。
class LookupKey {
public:
// 初始化this指针用来查找和某个特定sequence相关的快照中的user_key。
LookupKey(const Slice& user_key, SequenceNumber sequence);

LookupKey(const LookupKey&) = delete;
LookupKey& operator=(const LookupKey&) = delete;

~LookupKey();

// 返回一个MemTable中适合用来lookup的key
Slice memtable_key() const { return Slice(start_, end_ - start_); }

// Return an internal key (suitable for passing to an internal iterator)
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }

// Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }

private:
// 以以下格式构建数组:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// tag uint64
// <-- end_
// 该数组是一个MemTable key.
// 以“userkey”开头的后缀可以用作InternalKey.
const char* start_;
const char* kstart_;
const char* end_;
char space_[200]; // 避免为短key分配
};

inline LookupKey::~LookupKey() {
if (start_ != space_) delete[] start_;
}

} // namespace leveldb

#endif // STORAGE_LEVELDB_DB_DBFORMAT_H_

头文件中一些方法的实现都在对应的源文件中,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
// dbformat.cc
#include "db/dbformat.h"

#include <stdio.h>

#include "port/port.h"
#include "util/coding.h"

namespace leveldb {

// 将sequence和ValueType整合到一个无符号64位整型中
static uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek);
return (seq << 8) | t;
}

// 将key追加到result中
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {
result->append(key.user_key.data(), key.user_key.size());
PutFixed64(result, PackSequenceAndType(key.sequence, key.type));
}

// 该方法就是把ParsedInternalKey打印一下,用来debug
std::string ParsedInternalKey::DebugString() const {
char buf[50];
snprintf(buf, sizeof(buf), "' @ %llu : %d", (unsigned long long)sequence,
int(type));
std::string result = "'";
// EscapeString方法将返回一个便于人阅读的格式的string
result += EscapeString(user_key.ToString());
result += buf;
return result;
}

// 尝试把InternalKey转成ParsedInternalKey再调输出debug信息
// 转换失败的时候,则会在result里添加bad信息,并直接调用EscapeString方法
std::string InternalKey::DebugString() const {
std::string result;
ParsedInternalKey parsed;
if (ParseInternalKey(rep_, &parsed)) {
result = parsed.DebugString();
} else {
result = "(bad)";
result.append(EscapeString(rep_));
}
return result;
}

const char* InternalKeyComparator::Name() const {
return "leveldb.InternalKeyComparator";
}

int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
// 按照user_key升序,sequence_number和value_type降序,其中user_key升序使用用户提供的比较器
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
r = +1;
}
}
return r;
}

// 接下来两个方法到用到的地方再看下
void InternalKeyComparator::FindShortestSeparator(std::string* start,
const Slice& limit) const {
// 尝试缩短key的用户部分
Slice user_start = ExtractUserKey(*start);
Slice user_limit = ExtractUserKey(limit);
std::string tmp(user_start.data(), user_start.size());
user_comparator_->FindShortestSeparator(&tmp, user_limit);
if (tmp.size() < user_start.size() &&
user_comparator_->Compare(user_start, tmp) < 0) {
// User key has become shorter physically, but larger logically.
// Tack on the earliest possible number to the shortened user key.
PutFixed64(&tmp,
PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
assert(this->Compare(*start, tmp) < 0);
assert(this->Compare(tmp, limit) < 0);
start->swap(tmp);
}
}

void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
Slice user_key = ExtractUserKey(*key);
std::string tmp(user_key.data(), user_key.size());
user_comparator_->FindShortSuccessor(&tmp);
if (tmp.size() < user_key.size() &&
user_comparator_->Compare(user_key, tmp) < 0) {
// user_key逻辑上变大而实际上变小了。
// 将尽可能早的数字添加到缩短的user_key上。【TODO】
PutFixed64(&tmp,
PackSequenceAndType(kMaxSequenceNumber, kValueTypeForSeek));
assert(this->Compare(*key, tmp) < 0);
key->swap(tmp);
}
}

const char* InternalFilterPolicy::Name() const { return user_policy_->Name(); }

void InternalFilterPolicy::CreateFilter(const Slice* keys, int n,
std::string* dst) const {
// 我们依赖table.cc中的代码不会在意我们修改keys[]这样的一个实时。【TODO】
Slice* mkey = const_cast<Slice*>(keys);
for (int i = 0; i < n; i++) {
mkey[i] = ExtractUserKey(keys[i]);
// TODO(sanjay): Suppress dups?
}
user_policy_->CreateFilter(keys, n, dst);
}

bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const {
return user_policy_->KeyMayMatch(ExtractUserKey(key), f);
}

// 根据user_key和sequence_number构造出LookupKey
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size();
size_t needed = usize + 13; // 一个保守估计
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
kstart_ = dst;
memcpy(dst, user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;
}

} // namespace leveldb

Comparator

简单看一下比较器的头文件和实现文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//comparator.h
#ifndef STORAGE_LEVELDB_INCLUDE_COMPARATOR_H_
#define STORAGE_LEVELDB_INCLUDE_COMPARATOR_H_

#include <string>

#include "leveldb/export.h"

namespace leveldb {

class Slice;

// 比较器提供sstable或者database中key的排序方法。
// 一个比较器的实现必须是线程安全的,因为leveldb可能会同时从多个线程调用比较器的方法。
class LEVELDB_EXPORT Comparator {
public:
virtual ~Comparator();

// 三种比较结果:
// < 0 iff "a" < "b",
// == 0 iff "a" == "b",
// > 0 iff "a" > "b"
virtual int Compare(const Slice& a, const Slice& b) const = 0;

// 比较器的名字,用来检查比较器能不能match上(比如数据库创建时使用的名字和后来使用的不一致)
//
// 如果比较器实的实现改变了并且会造成任何多个key的相关顺序改变,那客户端就应该换一个新名字。
//
// 以'level.'开头的名字被保留,客户端不要使用
virtual const char* Name() const = 0;

// 高级方法:它们用于减少索引块等内部数据结构的空间需求。
// 如果*start < limit,改变*start变成一个短字符串[start,limit)
// 简单的比较器实现可能不会改变*start并直接返回
// 比如,一个什么也不做的实现也是可以的
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const = 0;

// 改变*key变成一个更短但>=*key的短字符串
// 简单的比较器实现可能不会改变*key并直接返回
// 比如,一个什么也不做的实现也是可以的
virtual void FindShortSuccessor(std::string* key) const = 0;
};

// 返回一个内置的使用字典序的比较器
// 返回结果仍然是这个模块的属性,不能删除。
LEVELDB_EXPORT const Comparator* BytewiseComparator();

} // namespace leveldb

#endif // STORAGE_LEVELDB_INCLUDE_COMPARATOR_H_

其中几个方法的实现如下,其实实现文件是Comparator类的一个内置子类,就是上面头文件最后出现的BytewiseComparator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <algorithm>
#include <cstdint>
#include <string>

#include "leveldb/comparator.h"
#include "leveldb/slice.h"
#include "util/logging.h"
#include "util/no_destructor.h"

namespace leveldb {

Comparator::~Comparator() {}

namespace {
class BytewiseComparatorImpl : public Comparator {
public:
BytewiseComparatorImpl() {}

//比较器名字,可见果然用了level.做前缀
virtual const char* Name() const { return "leveldb.BytewiseComparator"; }
//比较
virtual int Compare(const Slice& a, const Slice& b) const {
return a.compare(b);
}

//获得大于start但小于limit的最小值
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const {
// 找到公共前缀的长度
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
}

if (diff_index >= min_length) {
// 如果一个字符串是另一个字符串的前缀,则不做改变
} else {
//很奇特的一段逻辑,将较小的字符串的最后一位+1,并去掉后面的字符
//这时才回头看上面dbformat.cc文件中的那句注释“user_key逻辑上变大而实际上变小了。”,就理解意思了。
//逻辑上变大:因为字符串最后的字母+1了,比如'b'变成了'a',那么在字典序中就靠前了一些,变大了一些
//实际上变小:字符串长度变短了
//比如传入的参数是"abcg"和"abe",那运行后"abe"还是"abe",但"abcd"已经变成了"abd"
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
}
}
}

// 找到第一个可以递增的字符,递增它,然后去掉后面的字符
// 获得比start大的最小值
virtual void FindShortSuccessor(std::string* key) const {
// 找到第一个可以递增的字符
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i + 1);
return;
}
}
// *key 是0xff,保留。
}
};
} // namespace

//static单例
const Comparator* BytewiseComparator() {
static NoDestructor<BytewiseComparatorImpl> singleton;
return singleton.get();
}

} // namespace leveldb