当前位置: 首页 > redis, 分布式系统 > 正文

Redis核心解读–数据持久化过程与RDB文件

关键字:
1 星2 星3 星4 星5 星 (2 次投票, 评分: 5.00, 总分: 5)
Loading ... Loading ...
baidu_share

rdb是Redis保存内存数据到硬盘的一种持久化方式。主要在客户端发起bgsave命令后或者redis instance间建立master-slave架构时master对slave的复制时又或者Redis配置文件中指定的save parameters满足时,Redis实例会启动rdb持久化,主要是在检测到需要产生rdb文件时,fork出一个子进程做rdb持久化,然后Redis父进程会检测到子进程是否正常结束,对产生对rdb文件做处理。

rdb相关结构、格式定义

rdb文件格式:

1
2
3
4
#define REDIS_RDB_OPCODE_EXPIRETIME_MS 252
#define REDIS_RDB_OPCODE_EXPIRETIME 253
#define REDIS_RDB_OPCODE_SELECTDB   254
#define REDIS_RDB_OPCODE_EOF        255

这四个宏是rdb文件特殊标志符,REDIS_RDB_OPCODE_EXPIRETIME_MS和REDIS_RDB_OPCODE_EXPIRETIME是代表一个对象是否有过期设置,REDIS_RDB_OPCODE_SELECTDB代表数据库前缀,REDIS_RDB_OPCODE_EOF代表rdb文件结束。

REDIS_MAGIC_NUMBER | REDIS_RDB_OPCODE_SELECTDB | 数据库编号 | REDIS_EXPIRETIME标志(可选) | timestamp(可选) | 类型 | 实际对象 | …… | REDIS_EXPIRETIME标志(可选) | timestamp(可选) | 类型 | 实际对象 | …… | CHECKSUM(校验和) |REDIS_RDB_OPCODE_EOF

Redis存储rdb文件时的类型前置声明

1
2
3
4
5
6
7
8
9
10
11
#define REDIS_RDB_TYPE_STRING 0
#define REDIS_RDB_TYPE_LIST   1
#define REDIS_RDB_TYPE_SET    2
#define REDIS_RDB_TYPE_ZSET   3
#define REDIS_RDB_TYPE_HASH   4
 
#define REDIS_RDB_TYPE_HASH_ZIPMAP    9
#define REDIS_RDB_TYPE_LIST_ZIPLIST  10
#define REDIS_RDB_TYPE_SET_INTSET    11
#define REDIS_RDB_TYPE_ZSET_ZIPLIST  12
#define REDIS_RDB_TYPE_HASH_ZIPLIST  13

这些声明跟Redis类型在内存中编码方式是相同的,只是宏名字不同,REDIS_RDB_TYPE_LIST指的就是使用REDIS_ENCODING_LINKED_LIST的list逻辑类型,REDIS_RDB_TYPE_SET指的是使用REDIS_ENCODING_HT编码,REDIS_RDB_TYPE_ZSET指的是使用REDIS_ENCODING_SKIPLIST编码,REDIS_RDB_TYPE_HASH指的是使用REDIS_ENCODING_HT编码,而下面的5种指出里另外5种不同的类型编码方式,总的来说,跟内存中的编码方式是对应的。

不同类型对象在rdb中的保存格式

Sring类型:当Redis逻辑类型为REDIS_STRING时,无论其`encoding`是raw还是int,都会尝试编码为整数存储,因为整数存储占用空间比字符串存储小得多,那么为什么`encoding`是REDIS_ENCODING_RAW还尝试编码为整数呢,因为在存储每个DB的key时,key是sds结构的字符串,在得到key时会先转换为REDIS_ENCODING_RAW的robj对象,但此时,key有可能是整数,所以在`encoding`是REDIS_ENCODING_RAW还尝试编码为整数。最后,无论编码为整数还是字符串保存,在rdb保存的对象前置类型声明都是REDIS_RDB_TYPE_STRING。

String类型在rdb中保存格式类似于zip list的方式,首先如果是REDIS_ENCODING_RAW,会先保存长度,再保存原始字符串,如果是REDIS_ENCODING_INT,会直接保存整数。00,01,10头两位声明说明保存的是压缩长度编码,11头两位声明说明接下来保存的是整数对象或者是lzf压缩字符串:

1. 00|000000 => 头两位是00,用剩余的6位保存长度

2. 01|000000 00000000 => 头两位是01,用剩余的14位保存长度

3. 10|000000 [32 bit integer] => 头两位是10,那么会在接下来的4个字节中保存长度

4. 11|000000 this means: 头两位是11,那么说明了接下来string对象是编码后的,接下来的6个位会说明整数对象是一个字节,2个字节或者是4个字节,又或者是lzf压缩字符串。

这里需要区分的是`长度`和`编码为整数的String对象`,它们都是用一个字节前缀声明,但是长度是直接跟声明混合的,而编码为整数的String对象是在一个字节中声明整数占用空间,然后后存储整数

1
2
3
4
5
6
7
8
9
10
#define REDIS_RDB_6BITLEN 0
#define REDIS_RDB_14BITLEN 1
#define REDIS_RDB_32BITLEN 2
#define REDIS_RDB_ENCVAL 3
#define REDIS_RDB_LENERR UINT_MAX
 
#define REDIS_RDB_ENC_INT8 0        /* 8 bit signed integer */
#define REDIS_RDB_ENC_INT16 1       /* 16 bit signed integer */
#define REDIS_RDB_ENC_INT32 2       /* 32 bit signed integer */
#define REDIS_RDB_ENC_LZF 3         /* string compressed with FASTLZ */

String类型raw字符串:REDIS_RDB_TYPE_STRING|压缩长度编码|原始字符串

String类型int字符串:REDIS_RDB_TYPE_STRING|110000(01或10或00)|整数

String类型lzf压缩字符串:REDIS_RDB_TYPE_STRING|11000011|压缩长度编码|原始长度压缩编码|lzf压缩字符串

list类型有ziplist、linkedlist两种实现,保存ziplist最简单,可以直接获取ziplist在内存中的字节序列,复制到硬盘上。而linkedlist需要先保存列表元素数量,然后遍历列表,顺序保存list中的string类型元素。

List类型ziplist编码:REDIS_RDB_LIST_ZIPLIST|ziplist字节序列

List类型linkedlist编码:REDIS_RDB_TYPE_LIST|列表长度压缩编码|(String类型保存)+

set类型有hash、intset两种编码方式,保存intset也是直接复制intset在内存中的字节序列到硬盘上。保存hash也是先保存kv对长度,然后遍历hash,又因为set的值是存在dict结构的key中,所以只用存储一个string类型元素

set类型intset编码:REDIS_RDB_SET_INTSET|intset字节序列

set类型hash编码:REDIS_RDB_TYPE_SET|set长度压缩编码|(String类型保存)+

zest类型有ziplist和skiplist两种实现,保存zip list直接二进制保存。而skip list实质上是基于dict结构的一个引用,所以保存skip list底下的dict结构就可以。先保存kv对长度,然后遍历保存key、value,key是string对象,而value是double数,所以需要转换double为raw string保存。

zset类型ziplist编码:REDIS_RDB_ZSET_ZIPLIST|ziplist字节序列

zset类型skiplist编码:REDIS_RDB_TYPE_ZSET|zset长度压缩编码|(String类型保存key|一字节保存double的字符串表示长度|double的字符串表示)+

hash类型有hash、ziplist编码,保存zip list直接二进制保存。保存hash类型先保存kv对长度,然后再保存key、value,kv都是string类型保存

hash类型ziplist编码:REDIS_RDB_HASH_ZIPLIST|ziplist字节序列

hash类型hash编码:REDIS_RDB_TYPE_HASH|hash长度压缩编码|(String类型保存key|String类型保存value)+

rdb文件保存执行过程

当客户端发送bgsave请求或者有slave发起replication的请求时或者Redis满足配置文件中指定的save parameters时,Redis Instance会调用调用rdbSaveBackground()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void bgsaveCommand(redisClient *c) {
    if (server.rdb_child_pid != -1) {
        addReplyError(c,"Background save already in progress");
    } else if (server.aof_child_pid != -1) {
        addReplyError(c,"Can't BGSAVE while AOF log rewriting is in progress");
    } else if (rdbSaveBackground(server.rdb_filename) == REDIS_OK) {
        addReplyStatus(c,"Background saving started");
    } else {
        addReply(c,shared.err);
    }
}
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now */
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;
 
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds) {
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, sp->seconds);
                rdbSaveBackground(server.rdb_filename);
                break;
            }

在rdbSaveBackground()中会fork一个子进程进行rdbSave()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;
 
    if (server.rdb_child_pid != -1) return REDIS_ERR;
 
    server.dirty_before_bgsave = server.dirty;
 
    start = ustime();
    if ((childpid = fork()) == 0) {
        int retval;
 
        /* Child */
        if (server.ipfd > 0) close(server.ipfd);
        if (server.sofd > 0) close(server.sofd);
        retval = rdbSave(filename);
        exitFromChild((retval == REDIS_OK) ? 0 : 1);
    } else {
        /* Parent */
        server.stat_fork_time = ustime()-start;
        if (childpid == -1) {
            redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return REDIS_ERR;
        }
        redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
        server.rdb_save_time_start = time(NULL);
        server.rdb_child_pid = childpid;
        updateDictResizePolicy();
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}

在rdbSave()中,首先创建一个临时文件,在成功save后会改名,然后初始化rio(robust IO,是对IO操作的包装,在Redis中专门用于ddb save),然后在文件中首先记录REDIS MAGIC NUMBER。这里的rdb_checksum是校验和,每当向硬盘写时,都会更新校验和,用于后面读取rdb文件时校验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int rdbSave(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    char tmpfile[256];
    char magic[10];
    int j;
    long long now = mstime();
    FILE *fp;
    rio rdb;
    uint64_t cksum;
 
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
            strerror(errno));
        return REDIS_ERR;
    }
 
    rioInitWithFile(&rdb,fp);
    if (server.rdb_checksum)
        rdb.update_cksum = rioGenericUpdateChecksum;
    snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
    if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;
    ……

接上面的rdbSave(),dictGetSafeIterator()是得到dict数据结构的一个迭代器,用于遍历Redis Instance中的数据库,首先写入REDIS_RDB_OPCODE_SELECTDB前缀和DB编号。这里的rdbSaveLen()就是实现长度压缩编码的函数,用6bits、14bits或者32bits保存长度。然后遍历DB,获得key和value,还有可能具有的expire time。因为在DB层,key是ads结构的字符串,需要先包装为robj对象,再根据String类型的保存方式保存。调用rdbSaveKeyValuePair()解析value类型,根据类型保存格式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
……
for (j = 0; j < server.dbnum; j++) {
    redisDb *db = server.db+j;
    dict *d = db->dict;
    if (dictSize(d) == 0) continue;
    di = dictGetSafeIterator(d);
    if (!di) {
        fclose(fp);
        return REDIS_ERR;
    }
    if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
    if (rdbSaveLen(&rdb,j) == -1) goto werr;
 
    while((de = dictNext(di)) != NULL) {
        sds keystr = dictGetKey(de);
        robj key, *o = dictGetVal(de);
        long long expire;
 
        initStaticStringObject(key,keystr);
        expire = getExpire(db,&key);
        if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
    }
    dictReleaseIterator(di);
}
……

最后写入校验和,同步内容到硬盘,然后重命名文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
……
if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;
 
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
 * loading code skips the check in this case. */
cksum = rdb.cksum;
memrev64ifbe(&cksum);
rioWrite(&rdb,&cksum,8);
 
fflush(fp);
fsync(fileno(fp));
fclose(fp);
 
if (rename(tmpfile,filename) == -1) {
    redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
    unlink(tmpfile);
    return REDIS_ERR;
}
redisLog(REDIS_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = REDIS_OK;
return REDIS_OK;

让我们再深入rdbSaveKeyValuePair(),首先判断是否有expire time,如果有那么保存REDIS_RDB_OPCODE_EXPIRETIME_MS前缀和expire time。接着保存value的type,就是REDIS_RDB_TYPE_STRING、REDIS_RDB_TYPE_LIST、REDIS_RDB_TYPE_SET等等,然后保存key值,采用之前所述的String类型的格式解析方式。调用rdbSaveObject()进一步解析value值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                        long long expiretime, long long now)
{
    if (expiretime != -1) {
        /* If this key is already expired skip it */
        if (expiretime < now) return 0;
        if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }
 
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    if (rdbSaveObject(rdb,val) == -1) return -1;
    return 1;
}

在保存String类型的key例程rdbSaveStringObject()中,不仅是保存DB中每个Key调用这个例程,保存list、set、zest、hash中的元素也是调用这个函数,所以首先判断robj的类型,如果是REDIS_ENCODING_INT,那么用之前所述的一个字节前缀声明占用空间大小的方式来存储整数,如果是REDIS_ENCODING_Raw,那么也会尝试进行整数存储,如果整数超过了long的大小范围,那么还是采用字符串存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int rdbSaveStringObject(rio *rdb, robj *obj) {
    /* Avoid to decode the object, then encode it again, if the
     * object is alrady integer encoded. */
    if (obj->encoding == REDIS_ENCODING_INT) {
        return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
    } else {
        redisAssertWithInfo(NULL,obj,obj->encoding == REDIS_ENCODING_RAW);
        return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
    }
}
int rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
    int enclen;
    int n, nwritten = 0;
 
    /* Try integer encoding */
    if (len <= 11) {
        unsigned char buf[5];
        if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
            if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
            return enclen;
        }
    }
 
    /* Try LZF compression - under 20 bytes it's unable to compress even
     * aaaaaaaaaaaaaaaaaa so skip it */
    if (server.rdb_compression && len > 20) {
        n = rdbSaveLzfStringObject(rdb,s,len);
        if (n == -1) return -1;
        if (n > 0) return n;
        /* Return value of 0 means data can't be compressed, save the old way */
    }
 
    /* Store verbatim */
    if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
    nwritten += n;
    if (len > 0) {
        if (rdbWriteRaw(rdb,s,len) == -1) return -1;
        nwritten += len;
    }
    return nwritten;
}

在rdbSaveObject()中,解析value值的类型,根据相应的类型和之前所述的保存类型格式进行保存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
int rdbSaveObject(rio *rdb, robj *o) {
    int n, nwritten = 0;
 
    if (o->type == REDIS_STRING) {
        /* Save a string value */
        if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
        nwritten += n;
    } else if (o->type == REDIS_LIST) {
        /* Save a list value */
        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);
 
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
        } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
            list *list = o->ptr;
            listIter li;
            listNode *ln;
 
            if ((n = rdbSaveLen(rdb,listLength(list))) == -1) return -1;
            nwritten += n;
 
            listRewind(list,&li);
            while((ln = listNext(&li))) {
                robj *eleobj = listNodeValue(ln);
                if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
                nwritten += n;
            }
        } else {
            redisPanic("Unknown list encoding");
        }
    } else if (o->type == REDIS_SET) {
        /* Save a set value */
        if (o->encoding == REDIS_ENCODING_HT) {
            dict *set = o->ptr;
            dictIterator *di = dictGetIterator(set);
            dictEntry *de;
 
            if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;
            nwritten += n;
 
            while((de = dictNext(di)) != NULL) {
                robj *eleobj = dictGetKey(de);
                if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        } else if (o->encoding == REDIS_ENCODING_INTSET) {
            size_t l = intsetBlobLen((intset*)o->ptr);
 
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
        } else {
            redisPanic("Unknown set encoding");
        }
    } else if (o->type == REDIS_ZSET) {
        /* Save a sorted set value */
        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);
 
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
        } else if (o->encoding == REDIS_ENCODING_SKIPLIST) {
            zset *zs = o->ptr;
            dictIterator *di = dictGetIterator(zs->dict);
            dictEntry *de;
 
            if ((n = rdbSaveLen(rdb,dictSize(zs->dict))) == -1) return -1;
            nwritten += n;
 
            while((de = dictNext(di)) != NULL) {
                robj *eleobj = dictGetKey(de);
                double *score = dictGetVal(de);
 
                if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
                nwritten += n;
                if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        } else {
            redisPanic("Unknown sorted set encoding");
        }
    } else if (o->type == REDIS_HASH) {
        /* Save a hash value */
        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);
 
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
 
        } else if (o->encoding == REDIS_ENCODING_HT) {
            dictIterator *di = dictGetIterator(o->ptr);
            dictEntry *de;
 
            if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) return -1;
            nwritten += n;
 
            while((de = dictNext(di)) != NULL) {
                robj *key = dictGetKey(de);
                robj *val = dictGetVal(de);
 
                if ((n = rdbSaveStringObject(rdb,key)) == -1) return -1;
                nwritten += n;
                if ((n = rdbSaveStringObject(rdb,val)) == -1) return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
 
        } else {
            redisPanic("Unknown hash encoding");
        }
 
    } else {
        redisPanic("Unknown object type");
    }
    return nwritten;
}

到这里,rdbSaveBackground()就结束了,子进程退出后,在父进程的serverCron核心循环中会检测到子进程退出情况,调用backgroundSaveDoneHandler()处理相应情况,在backgroundSaveDoneHandler()中,会更新server全局变量关于rdb的成员,然后调用updateSlavesWaitingBgsave()更新slave状态,如果是slave发起的replication请求,这里会处理是否向slave传播rdb文件。
rdb文件的读取过程

当Slave接受到Master传播的rdb文件或者Redis Instance重启或者其他工具读取rdb文件到Redis Instance时,会调用rdbLoad()读取内容。

rdbLoad()大致是rdbSave()的保存的逆过程,读取REDIS_MAGIC_NUMBER,检测是否兼容现在的代码,然后设置是否计算校验和作为最后读取接受后的校验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
int rdbLoad(char *filename) {
    uint32_t dbid;
    int type, rdbver;
    redisDb *db = server.db+0;
    char buf[1024];
    long long expiretime, now = mstime();
    long loops = 0;
    FILE *fp;
    rio rdb;
 
    fp = fopen(filename,"r");
    if (!fp) {
        errno = ENOENT;
        return REDIS_ERR;
    }
    rioInitWithFile(&rdb,fp);
    if (server.rdb_checksum)
        rdb.update_cksum = rioGenericUpdateChecksum;
    if (rioRead(&rdb,buf,9) == 0) goto eoferr;
    buf[9] = '\0';
    if (memcmp(buf,"REDIS",5) != 0) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
        errno = EINVAL;
        return REDIS_ERR;
    }
    rdbver = atoi(buf+5);
    if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
        errno = EINVAL;
        return REDIS_ERR;
    }
 
    startLoading(fp);
    ……

接上面的rdbLoad(),每当循环1000次都时候,都会调用事件处理,服务客户端。首先读取1个字节,如果是REDIS_RDB_OPCODE_EXPIRETIME或者REDIS_RDB_OPCODE_EXPIRETIME_MS,那么说明这个对象有expiretime,读取expiretime,然后再次读取1个字节的类型。如果是REDIS_RDB_OPCODE_EOF,那么说明达到文件末尾,退出循环。如果是REDIS_RDB_OPCODE_SELECTDB,那么说明是一个新的数据库,调用rdbLoadLen()读取DB变化,rdbLoadLen()是rdbSaveLen()的逆过程,解码压缩长度编码。调用rdbLoadStringObject()和rdbLoadObject()解析key值和value值。接着判断该kv对是否过期,过期则不保存。否则加入DB中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
……
while(1) {
        robj *key, *val;
        expiretime = -1;
 
        /* Serve the clients from time to time */
        if (!(loops++ % 1000)) {
            loadingProgress(rioTell(&rdb));
            aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
        }
 
        /* Read type. */
        if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
            if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
            /* the EXPIRETIME opcode specifies time in seconds, so convert
             * into milliesconds. */
            expiretime *= 1000;
        } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
            /* Milliseconds precision expire times introduced with RDB
             * version 3. */
            if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        }
 
        if (type == REDIS_RDB_OPCODE_EOF)
            break;
 
        /* Handle SELECT DB opcode as a special case */
        if (type == REDIS_RDB_OPCODE_SELECTDB) {
            if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
                goto eoferr;
            if (dbid >= (unsigned)server.dbnum) {
                redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                exit(1);
            }
            db = server.db+dbid;
            continue;
        }
        /* Read key */
        if ((key = rdbLoadStringObject(&rdb)) == NULL) goto eoferr;
        /* Read value */
        if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr;
        /* Check if the key already expired. This function is used when loading
         * an RDB file from disk, either at startup, or when an RDB was
         * received from the master. In the latter case, the master is
         * responsible for key expiry. If we would expire keys here, the
         * snapshot taken by the master may not be reflected on the slave. */
        if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
            decrRefCount(key);
            decrRefCount(val);
            continue;
        }
        /* Add the new object in the hash table */
        dbAdd(db,key,val);
 
        /* Set the expire time if needed */
        if (expiretime != -1) setExpire(db,key,expiretime);
 
        decrRefCount(key);
    }
    ……

接上面的rdbLoad(),再读取完rdb文件后,通过读取计算的校验和和保存在rdb文件末尾的写时校验和比较,判断是否正常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  ……
/* Verify the checksum if RDB version is >= 5 */
if (rdbver >= 5 && server.rdb_checksum) {
    uint64_t cksum, expected = rdb.cksum;
 
    if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
    memrev64ifbe(&cksum);
    if (cksum == 0) {
        redisLog(REDIS_WARNING,"RDB file was saved with checksum disabled: no check performed.");
    } else if (cksum != expected) {
        redisLog(REDIS_WARNING,"Wrong RDB checksum. Aborting now.");
        exit(1);
    }
}
 
fclose(fp);
stopLoading();
return REDIS_OK;

key值的解析调用rdbLoadStringObject(),rdbLoadStringObject()直接调用了rdbGenericLoadStringObject(),首先读取长度声明,通过解析声明,判断是整数编码还是raw string,根据情况读取后续的字节,还原出String类型的robj。 参数encode指示了返回的是原始字符串编码还是可以编码整数。因为这里是解析出DB层的key,所以即使是整数,也要返回代表该整数的原始字符串。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
robj *rdbLoadStringObject(rio *rdb) {
    return rdbGenericLoadStringObject(rdb,0);
}
robj *rdbGenericLoadStringObject(rio *rdb, int encode) {
    int isencoded;
    uint32_t len;
    sds val;
 
    len = rdbLoadLen(rdb,&isencoded);
    if (isencoded) {
        switch(len) {
        case REDIS_RDB_ENC_INT8:
        case REDIS_RDB_ENC_INT16:
        case REDIS_RDB_ENC_INT32:
            return rdbLoadIntegerObject(rdb,len,encode);
        case REDIS_RDB_ENC_LZF:
            return rdbLoadLzfStringObject(rdb);
        default:
            redisPanic("Unknown RDB encoding type");
        }
    }
 
    if (len == REDIS_RDB_LENERR) return NULL;
    val = sdsnewlen(NULL,len);
    if (len && rioRead(rdb,val,len) == 0) {
        sdsfree(val);
        return NULL;
    }
    return createObject(REDIS_STRING,val);
}

解析value值的rdbLoadObject()相对来说就更复杂了,首先会读取类型声明,然后根据类型声明做相应的解析读取。但是不能根据rdb文件中的类型声明就简单的在内存中保存为相应的类型。需要根据配置文件中指示的如list_max_ziplist_entries值判断是否需要改变物理存储编码方式。下面的部分代码尝试读取String类型对象和在rdb文件中编码为linkedlist的list类型。因为这里是解析value值,所以读取的string对象可以编码为整数获得。读取list类型时,需要根据读取的长度声明,来判断建立何种编码的list类型,再调用rdbLoadEncodedStringObject()读取尽可能编码为整数的string对象,根据我们保存在内存中的类型编码,添加元素到对象中。这里添加到ziplist时,需要解码编码为整数的string对象,然后复制原始字符串到ziplist中,添加到linkedlist中可以直接push robj。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
if (rdbtype == REDIS_RDB_TYPE_STRING) {
    /* Read string value */
    if ((o = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
    o = tryObjectEncoding(o);
} else if (rdbtype == REDIS_RDB_TYPE_LIST) {
    /* Read list value */
    if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
 
    /* Use a real list when there are too many entries */
    if (len > server.list_max_ziplist_entries) {
        o = createListObject();
    } else {
        o = createZiplistObject();
    }
 
    /* Load every single element of the list */
    while(len--) {
        if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
 
        /* If we are using a ziplist and the value is too big, convert
         * the object to a real list. */
        if (o->encoding == REDIS_ENCODING_ZIPLIST &&
            ele->encoding == REDIS_ENCODING_RAW &&
            sdslen(ele->ptr) > server.list_max_ziplist_value)
                listTypeConvert(o,REDIS_ENCODING_LINKEDLIST);
 
        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
            dec = getDecodedObject(ele);
            o->ptr = ziplistPush(o->ptr,dec->ptr,sdslen(dec->ptr),REDIS_TAIL);
            decrRefCount(dec);
            decrRefCount(ele);
        } else {
            ele = tryObjectEncoding(ele);
            listAddNodeTail(o->ptr,ele);
        }
    }
} else if (rdbtype == REDIS_RDB_TYPE_SET) {
    /* Read list/set value */
    if ((len = rdbLoadLen(rdb,NULL)) == REDIS_RDB_LENERR) return NULL;
 
    /* Use a regular set when there are too many entries. */
    if (len > server.set_max_intset_entries) {
        o = createSetObject();
        /* It's faster to expand the dict to the right size asap in order
         * to avoid rehashing */
        if (len > DICT_HT_INITIAL_SIZE)
            dictExpand(o->ptr,len);
    } else {
        o = createIntsetObject();
    }
 
    /* Load every single element of the list/set */
    for (i = 0; i < len; i++) {
        long long llval;
        if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL;
        ele = tryObjectEncoding(ele);
 
        if (o->encoding == REDIS_ENCODING_INTSET) {
            /* Fetch integer value from element */
            if (isObjectRepresentableAsLongLong(ele,&llval) == REDIS_OK) {
                o->ptr = intsetAdd(o->ptr,llval,NULL);
            } else {
                setTypeConvert(o,REDIS_ENCODING_HT);
                dictExpand(o->ptr,len);
            }
        }
 
        /* This will also be called when the set was just converted
         * to a regular hash table encoded set */
        if (o->encoding == REDIS_ENCODING_HT) {
            dictAdd((dict*)o->ptr,ele,NULL);
        } else {
            decrRefCount(ele);
        }
    }
}
……

我们省略了一些类型的读取代码,因为这跟保存时所作的工作是类似的,解析了保存时的内容。在这里比较有意思的是,我们看到了作者为何将下面类型如此分开声明的原因,因为9~13的宏全部为字节序列,在rdb中前置声明为字节长度,在rdb保存格式与内存保存序列相同,在读取时可以当做string对象读取,内容直接复制作为产生对象的内容。需要注意的在Redis 2.4之前的版本中,small hash是使用zipmap存储的,而在Redis 2.6中,small hash改为ziplist,所以为了保持兼容性,当读取到zipmap存储的rdb文件时,会转换zipmap为ziplist。以后这段兼容代码会丢弃(zipmap zmlen is too short)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#define REDIS_RDB_TYPE_STRING 0
#define REDIS_RDB_TYPE_LIST   1
#define REDIS_RDB_TYPE_SET    2
#define REDIS_RDB_TYPE_ZSET   3
#define REDIS_RDB_TYPE_HASH   4
 
#define REDIS_RDB_TYPE_HASH_ZIPMAP    9
#define REDIS_RDB_TYPE_LIST_ZIPLIST  10
#define REDIS_RDB_TYPE_SET_INTSET    11
#define REDIS_RDB_TYPE_ZSET_ZIPLIST  12
#define REDIS_RDB_TYPE_HASH_ZIPLIST  13
……
 else if (rdbtype == REDIS_RDB_TYPE_HASH_ZIPMAP  ||
               rdbtype == REDIS_RDB_TYPE_LIST_ZIPLIST ||
               rdbtype == REDIS_RDB_TYPE_SET_INTSET   ||
               rdbtype == REDIS_RDB_TYPE_ZSET_ZIPLIST ||
               rdbtype == REDIS_RDB_TYPE_HASH_ZIPLIST)
    {
        robj *aux = rdbLoadStringObject(rdb);
 
        if (aux == NULL) return NULL;
        o = createObject(REDIS_STRING,NULL); /* string is just placeholder */
        o->ptr = zmalloc(sdslen(aux->ptr));
        memcpy(o->ptr,aux->ptr,sdslen(aux->ptr));
        decrRefCount(aux);
 
        /* Fix the object encoding, and make sure to convert the encoded
         * data type into the base type if accordingly to the current
         * configuration there are too many elements in the encoded data
         * type. Note that we only check the length and not max element
         * size as this is an O(N) scan. Eventually everything will get
         * converted. */
        switch(rdbtype) {
            case REDIS_RDB_TYPE_HASH_ZIPMAP:
                /* Convert to ziplist encoded hash. This must be deprecated
                 * when loading dumps created by Redis 2.4 gets deprecated. */
                {
                    unsigned char *zl = ziplistNew();
                    unsigned char *zi = zipmapRewind(o->ptr);
                    unsigned char *fstr, *vstr;
                    unsigned int flen, vlen;
                    unsigned int maxlen = 0;
 
                    while ((zi = zipmapNext(zi, &fstr, &flen, &vstr, &vlen)) != NULL) {
                        if (flen > maxlen) maxlen = flen;
                        if (vlen > maxlen) maxlen = vlen;
                        zl = ziplistPush(zl, fstr, flen, ZIPLIST_TAIL);
                        zl = ziplistPush(zl, vstr, vlen, ZIPLIST_TAIL);
                    }
 
                    zfree(o->ptr);
                    o->ptr = zl;
                    o->type = REDIS_HASH;
                    o->encoding = REDIS_ENCODING_ZIPLIST;
 
                    if (hashTypeLength(o) > server.hash_max_ziplist_entries ||
                        maxlen > server.hash_max_ziplist_value)
                    {
                        hashTypeConvert(o, REDIS_ENCODING_HT);
                    }
                }
                break;
            case REDIS_RDB_TYPE_LIST_ZIPLIST:
                o->type = REDIS_LIST;
                o->encoding = REDIS_ENCODING_ZIPLIST;
                if (ziplistLen(o->ptr) > server.list_max_ziplist_entries)
                    listTypeConvert(o,REDIS_ENCODING_LINKEDLIST);
                break;
            case REDIS_RDB_TYPE_SET_INTSET:
                o->type = REDIS_SET;
                o->encoding = REDIS_ENCODING_INTSET;
                if (intsetLen(o->ptr) > server.set_max_intset_entries)
                    setTypeConvert(o,REDIS_ENCODING_HT);
                break;
            case REDIS_RDB_TYPE_ZSET_ZIPLIST:
                o->type = REDIS_ZSET;
                o->encoding = REDIS_ENCODING_ZIPLIST;
                if (zsetLength(o) > server.zset_max_ziplist_entries)
                    zsetConvert(o,REDIS_ENCODING_SKIPLIST);
                break;
            case REDIS_RDB_TYPE_HASH_ZIPLIST:
                o->type = REDIS_HASH;
                o->encoding = REDIS_ENCODING_ZIPLIST;
                if (hashTypeLength(o) > server.hash_max_ziplist_entries)
                    hashTypeConvert(o, REDIS_ENCODING_HT);
                break;
            default:
                redisPanic("Unknown encoding");
                break;
        }
……

小结:在Redis实现rdb保存的思想中,节约空间是第一要务,而由于Redis过多的数据类型和结构,导致很多快照优化技术得不到实现,这也是Redis的优点在rdb保存实现上的问题。

REDIS_MAGIC_NUMBER | REDIS_RDB_OPCODE_SELECTDB | 数据库编号 | REDIS_EXPIRETIME标志(可选) | timestamp(可选) | 类型 | 实际对象 | …… | REDIS_EXPIRETIME标志(可选) | timestamp(可选) | 类型 | 实际对象 | …… | CHECKSUM(校验和) |REDIS_RDB_OPCODE_EOF

由于类型不同,所以实际对象的持久化也有以下几种不同格式:

String类型raw字符串:REDIS_RDB_TYPE_STRING|压缩长度编码|原始字符串

String类型int字符串:REDIS_RDB_TYPE_STRING|110000(01或10或00)|整数

String类型lzf压缩字符串:REDIS_RDB_TYPE_STRING|11000011|压缩长度编码|原始长度压缩编码|lzf压缩字符串

List类型ziplist编码:REDIS_RDB_LIST_ZIPLIST|ziplist字节序列

List类型linkedlist编码:REDIS_RDB_TYPE_LIST|列表长度压缩编码|(String类型保存)+

set类型intset编码:REDIS_RDB_SET_INTSET|intset字节序列

set类型hash编码:REDIS_RDB_TYPE_SET|set长度压缩编码|(String类型保存)+

zset类型ziplist编码:REDIS_RDB_ZSET_ZIPLIST|ziplist字节序列

zset类型skiplist编码:REDIS_RDB_TYPE_ZSET|zset长度压缩编码|(String类型保存key|一字节保存double的字符串表示长度|double的字符串表示)+

hash类型ziplist编码:REDIS_RDB_HASH_ZIPLIST|ziplist字节序列

hash类型hash编码:REDIS_RDB_TYPE_HASH|hash长度压缩编码|(String类型保存key|String类型保存value)+

本文固定链接: http://www.chepoo.com/redis-analysis-persistence-rdb.html | IT技术精华网

Redis核心解读–数据持久化过程与RDB文件:等您坐沙发呢!

发表评论