0x0前言

发布-订阅模型将发布消息分为不同的类别,订阅者可以只接收感兴趣的消息,无须了解发布者的存在。
使用redis发布订阅,可以设定对某一个key值进行消息发布及消息订阅。当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。

0x1相关操作

在Redis中,发布订阅相关命令有:
1.发布消息
2.订阅频道
3.取消订阅
4.按照模式订阅
5.按照模式取消订阅
6.查询订阅信息
发布消息的命令是publish,语法是:

publish 频道名称 消息

订阅消息的命令是subscribe,订阅者可以订阅一个或者多个频道,语法是:

subscribe 频道名称 [频道名称 ...]

0x2 python实现

import redis
import time
import threading

class redisMsg(object):

    def __init__(self, url="redis://127.0.0.1:6379/0", timeout=1):
        redisPool = redis.ConnectionPool.from_url(url)
        self.beat = None
        self.timeout = timeout
        self._client = redis.Redis(connection_pool=redisPool)
        self._sub = None
    def putmsg(self, channel, msg):
        """
        发送消息到指定频道
        :param channel:
        :param msg:
        :return:
        """
        return self._client.publish(channel, msg)
    def creatpub(self):
        """
        创建订阅者
        Return a Publish/Subscribe object.
        :return:
        """
        if self._sub is None:
            self._sub = self._client.pubsub()

    def startsub(self, channel, *channels, **function):
        """
        启动订阅者监听多个频道,启动心跳线程
        :param channel:
        :param channels:
        :param function:
        :return:
        """
        task = threading.Thread(target=self._runsub,args=(channel, function, *channels))
        task.start()
        if self.beat is None or not self.beat.is_alive():
            self.beat = threading.Thread(target=self._heartbeat,args=())
            self.beat.start()

    def addchannel(self, channel, *channels, **function):
        """
        添加监听频道
        :param channel:
        :param channels:
        :param function:
        :return:
        """
        if self._sub is None:
            self.startsub(channel, *channels, **function)
        if self.beat.is_alive():
            self.startsub(channel, *channels, **function)
        else:
            self._sub.subscibr(channel, *channels, **function)

    def delchannel(self, *channels):
        self._sub.unsubscribe(channels, *channels)

    def _runsub(self, channel, function, *channels):
        if self._sub is None:
            self.creatpub()
        self._sub.subscribe(channel, *channels, **function)
        for msg in self._sub.listen():
            print(msg)

    def _heartbeat(self):
        while True:
            time.sleep(self.timeout)
            channels = self._sub.channels
            if not channels:
                break
            self._sub.ping()

def channel1(msg):
    print("[*]channel1's callback procedure:".format(msg))

def channel2(msg):
    print("[*]channel2's callback procedure".format(msg))

if __name__ == '__main__':
    test = redisMsg()
    test.startsub("channel1","channel2",channel1 = channel1,channel2 = channel2)
    time.sleep(3)
    test.putmsg("channel1","channel1 的测试消息")
    time.sleep(2)
    test.putmsg("channel2","channel2 的测试消息")
    time.sleep(5)
    test.addchannel("channel3")
    test.putmsg("channel3","channel3 的测试消息")
    # test.delchannel("channel1","channel2")

image.png