当前位置: 首页 > redis, 分布式系统, 缓存系统 > 正文

Redis核心解读–pubsub(发布者-订阅者模式)的实现

关键字:
1 星2 星3 星4 星5 星 (6 次投票, 评分: 4.67, 总分: 5)
Loading ... Loading ...
baidu_share

Redis pubsub是publish-subscribe模式的一种实现。发布-订阅模式(publish-subscribe)是一种编程范式,发布方不发布消息给特定的接收方,而是由订阅方选择性接收。这使得发布方和订阅方相对独立,减少了耦合性。

Redis中的pubsub为实现这种范式提供了方法:

subscribe channel1, [channel2, channel3…] 客户端使用subscribe订阅channel,channel是一个字符串,可以代表一个类型的消息。服务器端会存储这个channel和客户端。并会返回订阅的channel和已经订阅的channel数。
unsubscribe [channel1, channel2, …] 客户端使用unsubscribe取消所有订阅或者特定channel。
publish channel message 客户端使用publish向特定channel发布message,所有符合channel的客户端都会收到该message。
psubscribe pattern1, [pattern2, pattern3] 客户端可以使用类似glob-style的匹配字符串类订阅相关channel,如news.*就相对于订阅了news.sports, news.art等。
punsubscribe [pattern1, pattern2, …] 客户端取消订阅全部或特定的pattern。
相关结构定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct redisServer {
    ……
    /* Pubsub */
    dict *pubsub_channels;  /* Map channels to list of subscribed clients */
    list *pubsub_patterns;  /* A list of pubsub_patterns */
    ……
}
 
typedef struct redisClient {
    ……
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
    ……
}
 
typedef struct pubsubPattern {
    redisClient *client;
    robj *pattern;
} pubsubPattern;

redisServer这个全局结构中包含了pubsub_channels字典结构成员和pubsub_patterns列表,channel和pattern基本上是分开的两个部分,在这里pubsub_channels是键值对,channel代表被订阅的channel,而clients-list代表订阅这个channel的clients。pubsub_patterns是一个元素为pubsubPattern的列表,pubsubPattern包括了client和pattern,也就是每个客户端和其订阅的pattern作为一队放入pubsub_patterns列表中。 redisClient也会包含client的pubsub_channels和pubsub_patterns,这里的pubsub_channels是键值对,pubsub_patterns是字符串pattern的列表,这样,客户端只包含了他订阅的channels和patterns。

pubsub实现
当服务器接受到subscribe命令后,调用subscribeCommand()来对每一个参数channel进行订阅。pubsubSubscribeChannel()实现了客户端订阅channel的功能,首先它会检查客户端的pubsub_channels是否包含该channel,如果没有则查看server全局变量中server.pubsub_channels成员是否有相应对channel,如果没有,那么建立键值对。否则,向存在的clients-list中加入该客户端。最后在自己的pubsub_channels加入该channel。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void subscribeCommand(redisClient *c) {
    int j;
 
    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
}
 
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
    struct dictEntry *de;
    list *clients = NULL;
    int retval = 0;
 
    /* Add the channel to the client -> channels hash table */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        de = dictFind(server.pubsub_channels,channel);
        if (de == NULL) {
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        listAddNodeTail(clients,c);
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
    return retval;
}

unsubscribe命令如果没有其他参数,则会取消所有该客户端订阅的channels,否则取消订阅特定的channels。这里pubsubUnsubscribeAllChannels()取消订阅所有channels遍历了该客户端的pubsub_channels来取消订阅。pubsubUnsubscribeChannel()实现了取消特定channel订阅,是订阅channel实现的逆过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void unsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
        return;
    } else {
        int j;
        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
}
 
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
    dictEntry *de;
    int count = 0;
 
    while((de = dictNext(di)) != NULL) {
        robj *channel = dictGetKey(de);
 
        count += pubsubUnsubscribeChannel(c,channel,notify);
    }
    dictReleaseIterator(di);
    return count;
}
 
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
    struct dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;
 
    incrRefCount(channel); 
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        retval = 1;
        /* Remove the client from the channel -> clients list hash table */
        de = dictFind(server.pubsub_channels,channel);
        redisAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        redisAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);
        if (listLength(clients) == 0) {
            /* Free the list and associated hash entry at all if this was
             * the latest client, so that it will be possible to abuse
             * Redis PUBSUB creating millions of channels. */
            dictDelete(server.pubsub_channels,channel);
        }
    }
    /* Notify the client */
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
 
    }
    decrRefCount(channel); /* it is finally safe to release it */
    return retval;
}

类似的,pubsubSubscribePattern实现了pattern的订阅,首先会查找该客户端订阅的patterns是否存在该pattern,如果没有,那么去server. pubsub_patterns增加对应的pubsubPattern,最后在自己的pubsub_patterns增加新pattern。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void unsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllChannels(c,1);
        return;
    } else {
        int j;
        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribeChannel(c,c->argv[j],1);
    }
}
 
int pubsubUnsubscribeAllChannels(redisClient *c, int notify) {
    dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
    dictEntry *de;
    int count = 0;
 
    while((de = dictNext(di)) != NULL) {
        robj *channel = dictGetKey(de);
 
        count += pubsubUnsubscribeChannel(c,channel,notify);
    }
    dictReleaseIterator(di);
    return count;
}
 
int pubsubUnsubscribeChannel(redisClient *c, robj *channel, int notify) {
    struct dictEntry *de;
    list *clients;
    listNode *ln;
    int retval = 0;
 
    incrRefCount(channel); 
    if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
        retval = 1;
        /* Remove the client from the channel -> clients list hash table */
        de = dictFind(server.pubsub_channels,channel);
        redisAssertWithInfo(c,NULL,de != NULL);
        clients = dictGetVal(de);
        ln = listSearchKey(clients,c);
        redisAssertWithInfo(c,NULL,ln != NULL);
        listDelNode(clients,ln);
        if (listLength(clients) == 0) {
            /* Free the list and associated hash entry at all if this was
             * the latest client, so that it will be possible to abuse
             * Redis PUBSUB creating millions of channels. */
            dictDelete(server.pubsub_channels,channel);
        }
    }
    /* Notify the client */
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.unsubscribebulk);
        addReplyBulk(c,channel);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
 
    }
    decrRefCount(channel); /* it is finally safe to release it */
    return retval;
}

类似的,pubsubSubscribePattern实现了pattern的订阅,首先会查找该客户端订阅的patterns是否存在该pattern,如果没有,那么去server. pubsub_patterns增加对应的pubsubPattern,最后在自己的pubsub_patterns增加新pattern。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void psubscribeCommand(redisClient *c) {
    int j;
 
    for (j = 1; j < c->argc; j++)
        pubsubSubscribePattern(c,c->argv[j]);
}
int pubsubSubscribePattern(redisClient *c, robj *pattern) {
    int retval = 0;
 
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.psubscribebulk);
    addReplyBulk(c,pattern);
    addReplyLongLong(c,dictSize(c->pubsub_channels)+listLength(c->pubsub_patterns));
    return retval;
}

punsubscribe类似于unsubscribe的实现,也是psubscribe的逆过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
void punsubscribeCommand(redisClient *c) {
    if (c->argc == 1) {
        pubsubUnsubscribeAllPatterns(c,1);
        return;
    } else {
        int j;
 
        for (j = 1; j < c->argc; j++)
            pubsubUnsubscribePattern(c,c->argv[j],1);
    }
}
 
int pubsubUnsubscribeAllPatterns(redisClient *c, int notify) {
    listNode *ln;
    listIter li;
    int count = 0;
 
    listRewind(c->pubsub_patterns,&li);
    while ((ln = listNext(&li)) != NULL) {
        robj *pattern = ln->value;
 
        count += pubsubUnsubscribePattern(c,pattern,notify);
    }
    return count;
}
 
int pubsubUnsubscribePattern(redisClient *c, robj *pattern, int notify) {
    listNode *ln;
    pubsubPattern pat;
    int retval = 0;
 
    incrRefCount(pattern); /* Protect the object. May be the same we remove */
    if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
        retval = 1;
        listDelNode(c->pubsub_patterns,ln);
        pat.client = c;
        pat.pattern = pattern;
        ln = listSearchKey(server.pubsub_patterns,&pat);
        listDelNode(server.pubsub_patterns,ln);
    }
    /* Notify the client */
    if (notify) {
        addReply(c,shared.mbulkhdr[3]);
        addReply(c,shared.punsubscribebulk);
        addReplyBulk(c,pattern);
        addReplyLongLong(c,dictSize(c->pubsub_channels)+
                       listLength(c->pubsub_patterns));
    }
    decrRefCount(pattern);
    return retval;
}

最后我们来看publish命令的实现,通过订阅channel和pattern的过程我们可以推断出publish的实现,从server.pubsub_channels中找出跟publish中channel相符的clients-list,然后再去server.pubsub_patterns中找出每一个相符的pattern和client。向这些客户端发送publish的消息。 在这里我们可以看到如果cluster_enabled开启,那么publish命令也会传播到所有节点,目前Redis只是简单的实现了传播pulish的channel和message到所有节点,channel和message被其他接收到后,会被相应的节点调用pubsubPublishMessage()来向订阅的clients发布消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
void publishCommand(redisClient *c) {
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    if (server.cluster_enabled) clusterPropagatePublish(c->argv[1],c->argv[2]);
    addReplyLongLong(c,receivers);
}
 
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    struct dictEntry *de;
    listNode *ln;
    listIter li;
 
    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;
 
        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;
 
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
 
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

小结
从实现中我们可以看到在Redis中其实pubsub这个模块与Redis核心功能并无太多交集,是相对独立的一个模块。但是因为Redis高效的dict和list结构,使得Redis实现pubsub相对简单,并且因为pubsub主要是操纵字符串,Redis使用字符串也是很方便。因此,Redis提供了pubsub模块来帮助开发者更好的使用publisher-subscribe模式。

本文固定链接: http://www.chepoo.com/redis-analysis-pubsub.html | IT技术精华网

Redis核心解读–pubsub(发布者-订阅者模式)的实现:等您坐沙发呢!

发表评论