redis订阅和发布

redis通过PUBLISH、SUBSCRIBE等命令实现了订阅与发布模式,这个功能有两种信息机制,分别是订阅\发布到频道、订阅\发布到模式。

订阅\发布到频道:

频道的订阅与信息发送。

redis的subscribe命令可以让客户端订阅任意数量的频道,每当有信息发送到被订阅的频道时,信息就会被发送给所有订阅指定频道的客户端。

作为例子,下图展示了频道 channel1 ,以及订阅这个频道的三个客户端 —— client2client5client1 之间的关系:

digraph pubsub_relation {    rankdir = BT;    node [style = filled];    edge [style = bold];    channel1 [label = "channel1", fillcolor = "#A8E270"];    node [shape = box, fillcolor = "#95BBE3"];    client2 [label = "client2"];    client5 [label = "client5"];    client1 [label = "client1"];    client2 -> channel1 [label = "subscribe"];    client5 -> channel1 [label = "subscribe"];    client1 -> channel1 [label = "subscribe"];}

当有新消息通过 PUBLISH 命令发送给频道 channel1 时,这个消息就会被发送给订阅它的三个客户端:

digraph send_message_to_subscriber {        node [style = filled];    edge [style = "dashed, bold"];        message [label = "PUBLISH channel1 message", shape = plaintext, fillcolor = "#FADCAD"];    message -> channel1 [color = "#B22222]"];    channel1 [label = "channel1", fillcolor = "#A8E270"];    node [shape = box];    client2 [label = "client2", fillcolor = "#95BBE3"];    client5 [label = "client5", fillcolor = "#95BBE3"];    client1 [label = "client1", fillcolor = "#95BBE3"];    /*    client2 -> channel1 [label = "subscribe"];    client5 -> channel1 [label = "subscribe"];    client1 -> channel1 [label = "subscribe"];    */    channel1 -> client2 [label = "message", color = "#B22222"];    channel1 -> client5 [label = "message", color = "#B22222"];    channel1 -> client1 [label = "message", color = "#B22222"];}

在后面的内容中,我们将探讨 SUBSCRIBE PUBLISH 命令的实现,以及这套订阅与发布机制的运作原理。

订阅频道:

每个 Redis 服务器进程都维持着一个表示服务器状态的 redis.h/redisServer 结构,结构的 pubsub_channels 属性是一个字典,这个字典就用于保存订阅频道的信息。

其中,字典的键为正在被订阅的频道,字典的值则是一个链表,链表中保存了所有订阅这个频道的客户端。

比如说,在下图展示的这个 pubsub_channels 示例中, client2client5 client1 就订阅了 channel1 ,而其他频道也分别被别的客户端所订阅:

digraph pubsub {    rankdir = LR;    node [shape = record, style = filled];    edge [style = bold];    // keys    pubsub [label = "pubsub_channels |<channel1> channel1 |<channel2> channel2 |<channel3> channel3 | ... |<channelN> channelN", fillcolor = "#A8E270"];    // clients blocking for channel1    client1 [label = "client1", fillcolor = "#95BBE3"];    client5 [label = "client5", fillcolor = "#95BBE3"];    client2 [label = "client2", fillcolor = "#95BBE3"];    null_1 [label = "NULL", shape = plaintext];        pubsub:channel1 -> client2;    client2 -> client5;    client5 -> client1;    client1 -> null_1;    // clients blocking for channel2    client7 [label = "client7", fillcolor = "#95BBE3"];    null_2 [label = "NULL", shape = plaintext];    pubsub:channel2 -> client7;    client7 -> null_2;    // channel    client3 [label = "client3", fillcolor = "#95BBE3"];    client4 [label = "client4", fillcolor = "#95BBE3"];    client6 [label = "client6", fillcolor = "#95BBE3"];    null_3 [label = "NULL", shape = plaintext];    pubsub:channel3 -> client3;    client3 -> client4;    client4 -> client6;    client6 -> null_3;}

当客户端调用subscribe命令时,程序就将客户端和要订阅的频道在pubsub_channels字典中关联起来。

举个例子,如果客户端 client10086 执行命令 SUBSCRIBE channel1 channel2 channel3 ,那么前面展示的 pubsub_channels 将变成下面这个样子:

digraph new_subscribe {    rankdir = LR;    node [shape = record, style = filled];    edge [style = bold];    // keys    pubsub [label = "pubsub_channels |<channel1> channel1 |<channel2> channel2 |<channel3> channel3 | ... |<channelN> channelN", fillcolor = "#A8E270"];    // clients blocking for channel1    client1 [label = "client1", fillcolor = "#95BBE3"];    client5 [label = "client5", fillcolor = "#95BBE3"];    client2 [label = "client2", fillcolor = "#95BBE3"];    client10086 [label = "client10086", fillcolor = "#FFC1C1"];    client10086_1 [label = "client10086", fillcolor = "#FFC1C1"];    client10086_2 [label = "client10086", fillcolor = "#FFC1C1"];    null_1 [label = "NULL", shape = plaintext];    null_2 [label = "NULL", shape = plaintext];    null_3 [label = "NULL", shape = plaintext];        pubsub:channel1 -> client2;    client2 -> client5;    client5 -> client1;    client1 -> client10086;    client10086 -> null_1;    // clients blocking for channel2    client7 [label = "client7", fillcolor = "#95BBE3"];    pubsub:channel2 -> client7;    client7 -> client10086_1;    client10086_1 -> null_2;    // channel    client3 [label = "client3", fillcolor = "#95BBE3"];    client4 [label = "client4", fillcolor = "#95BBE3"];    client6 [label = "client6", fillcolor = "#95BBE3"];    pubsub:channel3 -> client3;    client3 -> client4;    client4 -> client6;    client6 -> client10086_2;    client10086_2 -> null_3;}

SUBSCRIBE 命令的行为可以用伪代码表示如下:

def SUBSCRIBE(client, channels):

    # 遍历所有输入频道
    for channel in channels:

        # 将客户端添加到链表的末尾
        redisServer.pubsub_channels[channel].append(client)
通过 pubsub_channels 字典,程序只要检查某个频道是否为字典的键,就可以知道该频道是否正在被客户端订阅;只要取出某个键的值,就可以得到所有订阅该频道的客户端的信息。

发送信息到频道:

了解了pubsub_channels字典的结构后,解释publish命令的实现就简单了,当调用publish channel message命令,程序首先根据channel定位到字典的键,然后将信息发送给字典值链表中的所有客户端。

例如:如果客户端执行命令publish channel "helloword"那么订阅了这个频道的客户端都会接收到helloword这个消息。

publish命令的伪代码实现:

def  PUBLISH (channel,message)

#遍历所有订阅该频道channel的客户端

for client in server.pubsub_channels[channelName]:

#将信息发送给这些客户端

send_message(client,message)

使用 UNSUBSCRIBE 命令可以退订指定的频道,这个命令执行的是订阅的反操作:它从 pubsub_channels 字典的给定频道(键)中,删除关于当前客户端的信息,这样被退订频道的信息就不会再发送给这个客户端。

实例:在客户端实时响应的数据,每次返回三个字段:message,频道名称,接收到的信息。


服务端发送的数据:


模式的订阅与发布:

当使用publish命令发送信息到某个频道的时候,不仅所有订阅了该频道的客户端会收到信息,如果有某个模式和这个频道匹配的话,那么所有订阅这个频道的客户端也会收到信息。

下图展示了一个带有频道和模式的例子,其中 tweet.shop.* 模式匹配了 tweet.shop.kindle 频道和 tweet.shop.ipad 频道,并且有不同的客户端分别订阅它们三个:

digraph pattern_relation {        rankdir = BT;    node [style = filled];    edge [style = bold];    kindle [label = "tweet.shop.kindle", fillcolor = "#A8E270"];    ipad [label = "tweet.shop.ipad", fillcolor = "#A8E270"];    node [shape = octagon];    pattern [label = "tweet.shop.*"];    pattern -> kindle [label = "match"];    pattern -> ipad [label = "match"];    node [shape = box];    client123 [fillcolor = "#95BBE3"];    client256 [fillcolor = "#95BBE3"];    clientX [fillcolor = "#95BBE3"];    clientY [fillcolor = "#95BBE3"];    client3333 [fillcolor = "#95BBE3"];    client4444 [fillcolor = "#95BBE3"];    client5555 [fillcolor = "#95BBE3"];    client123 -> pattern [label = "subscribe"];    client256 -> pattern [label = "subscribe"];    clientX -> kindle [label = "subscribe"];    clientY -> kindle [label = "subscribe"];    client3333 -> ipad [label = "subscribe"];    client4444 -> ipad [label = "subscribe"];    client5555 -> ipad [label = "subscribe"];}

当有信息发送到 tweet.shop.kindle 频道时,信息除了发送给 clientXclientY 之外,还会发送给订阅 tweet.shop.* 模式的 client123 client256

digraph send_message_to_pattern {      node [style = filled];    edge [style = bold];    // tweet.shop.ipad    ipad [label = "tweet.shop.ipad", fillcolor = "#A8E270"];    ipad -> pattern [label = "match", dir = back];    node [shape = box];    ipad -> client3333 [label = "subscribe", dir = back];    ipad -> client4444 [label = "subscribe", dir = back];    ipad -> client5555 [label = "subscribe", dir = back];    node [shape = plaintext];    message [label = "PUBLISH tweet.shop.kindle message", fillcolor = "#FADCAD"];    kindle [label = "tweet.shop.kindle", shape = ellipse, fillcolor = "#A8E270"];    pattern [label = "tweet.shop.*", shape = octagon];    message -> kindle [style = "bold, dashed", color = "#B22222"];    kindle -> pattern [style = "bold, dashed", color = "#B22222"];    node [shape = box];    kindle -> clientX [style = "bold, dashed", color = "#B22222", label = "message"];    kindle -> clientY [style = "bold, dashed", color = "#B22222", label = "message"];    pattern -> client123 [label = "message", style = "bold, dashed", color = "#B22222"];    pattern -> client256 [label = "message", style = "bold, dashed", color = "#B22222"];    // client color    client123 [fillcolor = "#95BBE3"];    client256 [fillcolor = "#95BBE3"];    clientX [fillcolor = "#95BBE3"];    clientY [fillcolor = "#95BBE3"];    client3333 [fillcolor = "#95BBE3"];    client4444 [fillcolor = "#95BBE3"];    client5555 [fillcolor = "#95BBE3"];}

另一方面,如果接收到信息的是频道 tweet.shop.ipad ,那么 client123client256 同样会收到信息:

digraph pattern_relation {        rankdir = BT;    node [style = filled];    edge [style = bold];    kindle [label = "tweet.shop.kindle", fillcolor = "#A8E270"];    ipad [label = "tweet.shop.ipad", fillcolor = "#A8E270"];    node [shape = octagon];    pattern [label = "tweet.shop.*"];    pattern -> kindle [label = "match"];    pattern -> ipad [style = "bold, dashed", color = "#B22222", dir = back];    node [shape = box];    client123 -> pattern [label = "message", dir = back, style= "bold, dashed", color = "#B22222"];    client256 -> pattern [label = "message", dir = back, style= "bold, dashed", color = "#B22222"];    clientX -> kindle [label = "subscribe"];    clientY -> kindle [label = "subscribe"];    client3333 -> ipad [label = "message", style = "bold, dashed", color = "#B22222", dir = back];    client4444 -> ipad [label = "message", style = "bold, dashed", color = "#B22222", dir = back];    client5555 -> ipad [label = "message", style = "bold, dashed", color = "#B22222", dir = back];    // new    publish [label = "PUBLISH tweet.shop.ipad message", shape = plaintext, fillcolor = "#FADCAD"];    ipad -> publish [style = "bold, dashed", color = "#B22222", dir = back];    // client color    client123 [fillcolor = "#95BBE3"];    client256 [fillcolor = "#95BBE3"];    clientX [fillcolor = "#95BBE3"];    clientY [fillcolor = "#95BBE3"];    client3333 [fillcolor = "#95BBE3"];    client4444 [fillcolor = "#95BBE3"];    client5555 [fillcolor = "#95BBE3"];}

订阅模式

redisServer.pubsub_patterns 属性是一个链表,链表中保存着所有和模式相关的信息:

struct redisServer {
    // ...
    list *pubsub_patterns;
    // ...
};

链表中的每个节点都包含一个 redis.h/pubsubPattern 结构:

typedef struct pubsubPattern {
    redisClient *client;
    robj *pattern;
} pubsubPattern;

client 属性保存着订阅模式的客户端,而 pattern 属性则保存着被订阅的模式。

每当调用 PSUBSCRIBE 命令订阅一个模式时,程序就创建一个包含客户端信息和被订阅模式的 pubsubPattern 结构,并将该结构添加到 redisServer.pubsub_patterns 链表中。

作为例子,下图展示了一个包含两个模式的 pubsub_patterns 链表,其中 client123 client256 都正在订阅 tweet.shop.* 模式:

digraph publish_pattern {        rankdir = LR;    node [shape = record, style = filled];    edge [style = bold];    redisServer [label = "redisServer| ... |<pubsub_patterns> pubsub_patterns | ...", fillcolor = "#A8E270"];    pubsubPattern_1 [label = "pubsubPattern | client \n client123 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];    pubsubPattern_2 [label = "pubsubPattern | client \n client256 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];    redisServer:pubsub_patterns -> pubsubPattern_1;    pubsubPattern_1 -> pubsubPattern_2;}

如果这时客户端 client10086 执行 PSUBSCRIBE broadcast.list.* ,那么 pubsub_patterns 链表将被更新成这样:

digraph pubsub_pattern {        rankdir = LR;    node [shape = record, style = filled];    edge [style = bold];    redisServer [label = "redisServer| ... |<pubsub_patterns> pubsub_patterns | ...", fillcolor = "#A8E270"];    pubsubPattern_1 [label = "pubsubPattern | client \n client123 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];    pubsubPattern_2 [label = "pubsubPattern | client \n client256 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];    pubsubPattern_3 [label = "pubsubPattern | client \n client10086 | pattern \n broadcast.live.*", fillcolor = "#FFC1C1"];    redisServer:pubsub_patterns -> pubsubPattern_1;    pubsubPattern_1 -> pubsubPattern_2;    pubsubPattern_2 -> pubsubPattern_3;}

通过遍历整个 pubsub_patterns 链表,程序可以检查所有正在被订阅的模式,以及订阅这些模式的客户端。

发送信息到模式

发送信息到模式的工作也是由 PUBLISH 命令进行的,在前面讲解频道的时候,我们给出了这样一段伪代码,说它定义了 PUBLISH 命令的行为:

def PUBLISH(channel, message):

    # 遍历所有订阅频道 channel 的客户端
    for client in server.pubsub_channels[channel]:

        # 将信息发送给它们
        send_message(client, message)

但是,这段伪代码并没有完整描述 PUBLISH 命令的行为,因为 PUBLISH 除了将 message 发送到所有订阅 channel 的客户端之外,它还会将 channel pubsub_patterns 中的模式进行对比,如果 channel 和某个模式匹配的话,那么也将 message 发送到订阅那个模式的客户端。

完整描述 PUBLISH 功能的伪代码定于如下:

def PUBLISH(channel, message):

    # 遍历所有订阅频道 channel 的客户端
    for client in server.pubsub_channels[channel]:

        # 将信息发送给它们
        send_message(client, message)

    # 取出所有模式,以及订阅模式的客户端
    for pattern, client in server.pubsub_patterns:

        # 如果 channel 和模式匹配
        if match(channel, pattern):

            # 那么也将信息发给订阅这个模式的客户端
            send_message(client, message)

举个例子,如果 Redis 服务器的 pubsub_patterns 状态如下:

digraph pubsub_pattern {        rankdir = LR;    node [shape = record, style = filled];    edge [style = bold];    redisServer [label = "redisServer| ... |<pubsub_patterns> pubsub_patterns | ...", fillcolor = "#A8E270"];    pubsubPattern_1 [label = "pubsubPattern | client \n client123 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];    pubsubPattern_2 [label = "pubsubPattern | client \n client256 | pattern \n tweet.shop.*", fillcolor = "#95BBE3"];    pubsubPattern_3 [label = "pubsubPattern | client \n client10086 | pattern \n broadcast.live.*", fillcolor = "#FFC1C1"];    redisServer:pubsub_patterns -> pubsubPattern_1;    pubsubPattern_1 -> pubsubPattern_2;    pubsubPattern_2 -> pubsubPattern_3;}

那么当某个客户端发送信息 "Amazon Kindle, $69." tweet.shop.kindle 频道时,除了所有订阅了 tweet.shop.kindle 频道的客户端会收到信息之外,客户端 client123 client256 也同样会收到信息,因为这两个客户端订阅的 tweet.shop.* 模式和 tweet.shop.kindle 频道匹配。

退订模式

使用 PUNSUBSCRIBE 命令可以退订指定的模式,这个命令执行的是订阅模式的反操作:程序会删除 redisServer.pubsub_patterns 链表中,所有和被退订模式相关联的 pubsubPattern 结构,这样客户端就不会再收到和模式相匹配的频道发来的信息。

小结

  • 订阅信息由服务器进程维持的 redisServer.pubsub_channels 字典保存,字典的键为被订阅的频道,字典的值为订阅频道的所有客户端。
  • 当有新消息发送到频道时,程序遍历频道(键)所对应的(值)所有客户端,然后将消息发送到所有订阅频道的客户端上。
  • 订阅模式的信息由服务器进程维持的 redisServer.pubsub_patterns 链表保存,链表的每个节点都保存着一个 pubsubPattern 结构,结构中保存着被订阅的模式,以及订阅该模式的客户端。程序通过遍历链表来查找某个频道是否和某个模式匹配。
  • 当有新消息发送到频道时,除了订阅频道的客户端会收到消息之外,所有订阅了匹配频道的模式的客户端,也同样会收到消息。
  • 退订频道和退订模式分别是订阅频道和订阅模式的反操作。
转载:http://redisbook.readthedocs.org/en/latest/feature/pubsub.html#id6
相关文章
相关标签/搜索
每日一句
    每一个你不满意的现在,都有一个你没有努力的曾经。
公众号推荐
   一个历史类的公众号,欢迎关注
一两拨千金