0x0前言
发布-订阅模型将发布消息分为不同的类别,订阅者可以只接收感兴趣的消息,无须了解发布者的存在。
使用redis发布订阅,可以设定对某一个key值进行消息发布及消息订阅。当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应的消息。
0x1相关操作
在Redis中,发布订阅相关命令有:
1.发布消息
2.订阅频道
3.取消订阅
4.按照模式订阅
5.按照模式取消订阅
6.查询订阅信息
发布消息的命令是publish,语法是:
publish 频道名称 消息
订阅消息的命令是subscribe,订阅者可以订阅一个或者多个频道,语法是:
subscribe 频道名称 [频道名称 ...]
0x2 python实现
import redis
import time
import threading
class redisMsg(object):
def __init__(self, url="redis://127.0.0.1:6379/0", timeout=1):
redisPool = redis.ConnectionPool.from_url(url)
self.beat = None
self.timeout = timeout
self._client = redis.Redis(connection_pool=redisPool)
self._sub = None
def putmsg(self, channel, msg):
"""
发送消息到指定频道
:param channel:
:param msg:
:return:
"""
return self._client.publish(channel, msg)
def creatpub(self):
"""
创建订阅者
Return a Publish/Subscribe object.
:return:
"""
if self._sub is None:
self._sub = self._client.pubsub()
def startsub(self, channel, *channels, **function):
"""
启动订阅者监听多个频道,启动心跳线程
:param channel:
:param channels:
:param function:
:return:
"""
task = threading.Thread(target=self._runsub,args=(channel, function, *channels))
task.start()
if self.beat is None or not self.beat.is_alive():
self.beat = threading.Thread(target=self._heartbeat,args=())
self.beat.start()
def addchannel(self, channel, *channels, **function):
"""
添加监听频道
:param channel:
:param channels:
:param function:
:return:
"""
if self._sub is None:
self.startsub(channel, *channels, **function)
if self.beat.is_alive():
self.startsub(channel, *channels, **function)
else:
self._sub.subscibr(channel, *channels, **function)
def delchannel(self, *channels):
self._sub.unsubscribe(channels, *channels)
def _runsub(self, channel, function, *channels):
if self._sub is None:
self.creatpub()
self._sub.subscribe(channel, *channels, **function)
for msg in self._sub.listen():
print(msg)
def _heartbeat(self):
while True:
time.sleep(self.timeout)
channels = self._sub.channels
if not channels:
break
self._sub.ping()
def channel1(msg):
print("[*]channel1's callback procedure:".format(msg))
def channel2(msg):
print("[*]channel2's callback procedure".format(msg))
if __name__ == '__main__':
test = redisMsg()
test.startsub("channel1","channel2",channel1 = channel1,channel2 = channel2)
time.sleep(3)
test.putmsg("channel1","channel1 的测试消息")
time.sleep(2)
test.putmsg("channel2","channel2 的测试消息")
time.sleep(5)
test.addchannel("channel3")
test.putmsg("channel3","channel3 的测试消息")
# test.delchannel("channel1","channel2")