当前位置: 首页 > redis, 分布式系统, 缓存系统 > 正文

Redis核心解读–事务(Multi和CAS)的实现

关键字:
1 星2 星3 星4 星5 星 (2 次投票, 评分: 5.00, 总分: 5)
Loading ... Loading ...
baidu_share

Redis也有事务的功能,但是这个事务可能跟MySQL之类的RDBMS不同。官网上的Transactions介绍了Redis中的事务。Redis中使用multi和exec命令设定事务边界,discard在multi命令提交后可丢弃事务。另外watch命令可以实现CAS操作。

Redis中的事务归纳一下有这么几个特点

在multi和exec之间的命令作为事务处理,所有命令视为原子性操作,不能打断。
在multi和exec之间的命令如果某条命令发生错误不回滚事务,也就是说,除了出错的命令,其他正常执行。
如果开启AOF机制,那么multi和exec的之间的命令也会用一条write命令写到硬盘。如果在写时被硬中断(停电或管理员强制kill),Redis重启时会检测到错误,可以用redis-check-aof工具修复。
支持CAS(check and set)操作,watch命令可以锁定某个key,unwatch命令取消锁定,在事务执行时如果检测到watch的key被修改,事务失败。事务成功执行后,会unwatch掉所有观察的keys。
这里需要解释的是为什么Redis提供的事务不支持错误回滚?原因是Redis作为缓存系统,命令被编程到程序里一般不会出错(错误如参数个数出错,类型出错等),如果发生出错,说明是程序出现了问题,在生产环境下一般不会这类问题。并且这样的事务在Redis实现非常简单,因为Redis是单线程程序,所以只要存储事务操作最后一起执行就实现了原子性操作,如果增加回滚会影响Redis性能。 CAS操作应用于下面这个场景 watch stringA stringA_r = stringA + “abcd” multi set stringA string_r exec 这类场景很常见,当stringA被watch后如果发生变动,那么下面的事务就会失败。

相关结构定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* Client MULTI/EXEC state */
typedef struct multiCmd {
    robj **argv;
    int argc;
    struct redisCommand *cmd;
} multiCmd;
 
typedef struct multiState {
    multiCmd *commands;     /* Array of MULTI commands */
    int count;              /* Total number of MULTI commands */
} multiState;
 
typedef struct redisClient {
    ……
    multiState mstate;      /* MULTI/EXEC state */
    ……
}
 
typedef struct watchedKey {
    robj *key;
    redisDb *db;
} watchedKey;

redisClient的flags成员定义了客户端的状态,可以是REDIS_MULTI和#define REDIS_DIRTY_CAS状态,前者说明该客户端处于事务提交状态,接受到的命令存储不执行,后者说明了该客户端watch的key发生了修改,事务执行会失败。

事务实现
processCommand是Redis处理接受到的命令时调用的函数,通过在这里预处理,决定如何执行并调用call()来执行命令。 首先当该客户端不处于事务状态并接受到multi时,c->flags不具有REDIS_MULTI状态,所以执行事务命令multi。执行multi命令后,c->flags就具有了REDIS_MULTI状态,后面的非事务命令会被存储直到收到事务命令。queueMultiCommand()会将每一个非事务命令存储到mstate成员中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
int processCommand(redisClient *c) {
    ……
    /* Exec the command */
    if (c->flags & REDIS_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c);
        addReply(c,shared.queued);
    } else {
        call(c,REDIS_CALL_FULL);
        if (listLength(server.ready_keys))
            handleClientsBlockedOnLists();
    }
    ……
}
 
void queueMultiCommand(redisClient *c) {
    multiCmd *mc;
    int j;
 
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = zmalloc(sizeof(robj*)*c->argc);
    memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
    for (j = 0; j < c->argc; j++)
        incrRefCount(mc->argv[j]);
    c->mstate.count++;
}

在multiCommand中,简单的将客户端设置为REDIS_MULTI,其后得到的命令都会被存储。discard命令会丢弃存储的命令,将状态重新恢复为非事务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void multiCommand(redisClient *c) {
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"MULTI calls can not be nested");
        return;
    }
    c->flags |= REDIS_MULTI;
    addReply(c,shared.ok);
}
void discardCommand(redisClient *c) {
    if (!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"DISCARD without MULTI");
        return;
    }
    discardTransaction(c);
    addReply(c,shared.ok);
}
 
void discardTransaction(redisClient *c) {
    freeClientMultiState(c);
    initClientMultiState(c);
    c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);;
    unwatchAllKeys(c);
}

watch命令在multi提交之前执行,watchCommand()执行watchForKey()来监视key。Redis中实现监视key的方式很简单,每个Redis DB都会存储一个叫做watched_keys的key为被监视key,value为客户端队列的dict结构,而客户端也有watched_keys列表结构成员,该列表的元素是watchedKey结构。存储了该客户端监视的keys。这样,watchForKey()只要先去客户端的watched_keys查看是否已经监视这个key,若没有再去DB的watched_keys dict成员中加入这个监视key,最后在自己的watched_keys中加入新的监视key。 unwatchAllKeys()会删除该客户端监视的所有keys,然后到对应的DB的watched_keys中删掉该客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
void unwatchCommand(redisClient *c) {
    unwatchAllKeys(c);
    c->flags &= (~REDIS_DIRTY_CAS);
    addReply(c,shared.ok);
}
 
void unwatchAllKeys(redisClient *c) {
    listIter li;
    listNode *ln;
 
    if (listLength(c->watched_keys) == 0) return;
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        list *clients;
        watchedKey *wk;
 
        /* Lookup the watched key -> clients list and remove the client
         * from the list */
        wk = listNodeValue(ln);
        clients = dictFetchValue(wk->db->watched_keys, wk->key);
        redisAssertWithInfo(c,NULL,clients != NULL);
        listDelNode(clients,listSearchKey(clients,c));
        /* Kill the entry at all if this was the only client */
        if (listLength(clients) == 0)
            dictDelete(wk->db->watched_keys, wk->key);
        /* Remove this watched key from the client->watched list */
        listDelNode(c->watched_keys,ln);
        decrRefCount(wk->key);
        zfree(wk);
    }
}
 
void watchCommand(redisClient *c) {
    int j;
 
    if (c->flags & REDIS_MULTI) {
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    }
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);
}
 
void watchForKey(redisClient *c, robj *key) {
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;
 
    /* Check if we are already watching for this key */
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) {
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    }
    /* This key is not already watched in this DB. Let's add it */
    clients = dictFetchValue(c->db->watched_keys,key);
    if (!clients) { 
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    }
    listAddNodeTail(clients,c);
    /* Add the new key to the lits of keys watched by this client */
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->db = c->db;
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
}

当DB中执行修改操作时,会调用signalModifiedKey(),当执行flushDB操作时,会调用signalFlushedDb()。而这两个函数都会去查看watched_keys中是否有相应的客户端在watch这个key。如touchWatchedKey()如果发现被修改的key在watched_keys中,那么对应的客户端们状态都被置为REDIS_DIRTY_CAS,之后执行exec命令会失败。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
void signalModifiedKey(redisDb *db, robj *key) {
    touchWatchedKey(db,key);
}
 
void signalFlushedDb(int dbid) {
    touchWatchedKeysOnFlush(dbid);
}
 
void touchWatchedKey(redisDb *db, robj *key) {
    list *clients;
    listIter li;
    listNode *ln;
 
    if (dictSize(db->watched_keys) == 0) return;
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;
 
    /* Mark all the clients watching this key as REDIS_DIRTY_CAS */
    /* Check if we are already watching for this key */
    listRewind(clients,&li);
    while((ln = listNext(&li))) {
        redisClient *c = listNodeValue(ln);
 
        c->flags |= REDIS_DIRTY_CAS;
    }
}
 
void touchWatchedKeysOnFlush(int dbid) {
    listIter li1, li2;
    listNode *ln;
 
    /* For every client, check all the waited keys */
    listRewind(server.clients,&li1);
    while((ln = listNext(&li1))) {
        redisClient *c = listNodeValue(ln);
        listRewind(c->watched_keys,&li2);
        while((ln = listNext(&li2))) {
            watchedKey *wk = listNodeValue(ln);
 
            /* For every watched key matching the specified DB, if the
             * key exists, mark the client as dirty, as the key will be
             * removed. */
            if (dbid == -1 || wk->db->id == dbid) {
                if (dictFind(wk->db->dict, wk->key->ptr) != NULL)
                    c->flags |= REDIS_DIRTY_CAS;
            }
        }
    }
}

最后是execCommand,这里做的工作就是确认是否处于事务状态,并且确认该客户端监视的keys没有被修改,然后就开始执行所有被存储的命令。最后,客户端回归正常状态,并且,如果有monitor的话,就复制命令给monitor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
void execCommand(redisClient *c) {
    int j;
    robj **orig_argv;
    int orig_argc;
    struct redisCommand *orig_cmd;
 
    if (!(c->flags & REDIS_MULTI)) {
        addReplyError(c,"EXEC without MULTI");
        return;
    }
 
    /* Check if we need to abort the EXEC if some WATCHed key was touched.
     * A failed EXEC will return a multi bulk nil object. */
    if (c->flags & REDIS_DIRTY_CAS) {
        freeClientMultiState(c);
        initClientMultiState(c);
        c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
        unwatchAllKeys(c);
        addReply(c,shared.nullmultibulk);
        goto handle_monitor;
    }
 
    /* Replicate a MULTI request now that we are sure the block is executed.
     * This way we'll deliver the MULTI/..../EXEC block as a whole and
     * both the AOF and the replication link will have the same consistency
     * and atomicity guarantees. */
    execCommandReplicateMulti(c);
 
    /* Exec all the queued commands */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
    orig_argv = c->argv;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyMultiBulkLen(c,c->mstate.count);
    for (j = 0; j < c->mstate.count; j++) {
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->cmd = c->mstate.commands[j].cmd;
        call(c,REDIS_CALL_FULL);
 
        /* Commands may alter argc/argv, restore mstate. */
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    }
    c->argv = orig_argv;
    c->argc = orig_argc;
    c->cmd = orig_cmd;
    freeClientMultiState(c);
    initClientMultiState(c);
    c->flags &= ~(REDIS_MULTI|REDIS_DIRTY_CAS);
    /* Make sure the EXEC command is always replicated / AOF, since we
     * always send the MULTI command (we can't know beforehand if the
     * next operations will contain at least a modification to the DB). */
    server.dirty++;
 
handle_monitor:
    /* Send EXEC to clients waiting data from MONITOR. We do it here
     * since the natural order of commands execution is actually:
     * MUTLI, EXEC, ... commands inside transaction ...
     * Instead EXEC is flagged as REDIS_CMD_SKIP_MONITOR in the command
     * table, and we do it here with correct ordering. */
    if (listLength(server.monitors) && !server.loading)
        replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}

小结
在这里,我们看到Redis单线程的好处了,事务实现的及其简单,CAS的优雅实现都得益于单线程。

本文固定链接: http://www.chepoo.com/redis-analysis-transaction-multi-cas.html | IT技术精华网

Redis核心解读–事务(Multi和CAS)的实现:等您坐沙发呢!

发表评论