索引:
基于list的实现方式
基于publish/subscribe
实战
消息队列简介
消息队列:是消息的顺序集合。
比如网站的PV统计和查看,传统方式就是每个页面发一个AJAX然后mysql给PV+1。用户量非常大的时候,没有办法实时插入PV。
结合Redis消息队列的实现,也是每个用户访问的时候发送ajax到控制器,这个时候redis每次rpush pvlog,相当于直接往数组后面插入一万个行为,接下来用一个脚本运输处理pvlog,set pv查看时候get pv,如果想处理用户请求时间等等,同样可以这样异步处理。
常见场景和解决的问题:
- 应对流量峰值
- 异步消费(不定速的插入,生产和匀速的处理,消费)
- 解耦应用(不同来源的生产和同步去向的消费,基于publish/subscribe实现),即消息队列作为消息池,同时往里面写入的可能有多种数据,根据不同的场景来进行消费。
redis实现消息队列原理
使用redis实现的最主要优势是简单快捷,性能没有kafka高,但是安装简便,kafka性能高但是比较重,如果消息队列不是很多,比如说一个博客计算pv,那么kafka可能比整个项目还要大。
实现方式:
方法一: 基于list的实现方式
核心代码
没有用消息队列的方式,使用incrBy大概上限在1000万
1 2 3 4 5 6 7 8 9 10 11 12 |
<?php $redis = new Redis(); $redis->connect('127.0.0.1', 6380); $res->select(0); $key = 'pv:index'; // 看一下是不是没有,如果没有的话就设置成0 if(false === $redis->get($key)) { $redis->set($key, 0); } // 如果有了就增加1 注意incrBy的上线大概在1000万 $redis->incrBy($key,1); |
用list来实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
// 用list消息队列实现 $key = 'listpv:index'; $redis->rPush($key, 1); // 后台的cron来实现 // 先连接redis $redis = new Redis(); $redis->connect('127.0.0.1', 6380); $res->select(0); $key = 'pv:index'; // 用一个死循环 while(true) { if(false !== $redis->lPop($key)) { $redis->incrBy('pv:index'); } } // 然后在terminal中无线循环这个脚本 |
基于list来实现消息队列的特点:
与水库相似的地方:
- 水库的容量决定承载能力 — redis的容量决定业务的承载能力
- 每一滴随只可能经过一个闸门 — 每条消息只能被一个消费者消费
与水库不同的地方:
- 水库用于蓄水 — 一般要把消息全部消费掉
- 不要的随扔掉 — 处理失败的消息要做容错
方法二:基于publish/subscribe
- 频道固定,生产者和消费者不固定,可能一对多,也可能多对一,也可能多对多。
命令行的实现方式:
首先起两个redis cli,一个作为订阅,一个作为发布
订阅者:
1 |
SUBSCRIBE channel1 channel2 |
发布者
1 |
PUBLISH channel1 helloChannel |
1 |
PUBLISH channel2 helloChannel2 |
- 如果这个时候发布到一个没有被订阅的channel,那么这条消息就会丢失。
- 如果有多个订阅了同一个channel,但有信息发布到同一个channel的时候,他们都会受到
代码实现:
发布者:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<?php // 发布者 $redis = new Redis(); $redis->connect('127.0.0.1', 6380); $res = $redis->publish('c1','hello c1'); echo "clients reading c1:{$res}\n"; $res = $redis->publish('c2','hello c2'); echo "clients reading c2:{$res}\n"; $res = $redis->publish('c3','hello c3'); echo "clients reading c3:{$res}\n"; |
监听者:
1 2 3 4 5 6 7 8 9 |
<?php $redis = new Redis(); $redis->connect('127.0.0.1', 6380); // 超时控制 $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); // 订阅 $redis->subscribe(['c1','c2'],function(Redis $instance, $channel, $message){ echo "received message form {$channel} : {$message}\n"; }) |
实战部分-生成内容页的质量分:
实现三个功能:
- 统计首页、列表页、内容页的PV
- 统计浏览时间超过5s的内容页
- 内容页的PV+1分,浏览时间超过5s+5分,不超过5秒-1分,生成内容页的质量分
前端部分:
1 2 3 4 5 6 7 8 9 10 |
<script> // ajax 访问ajax.php,给内容页增加PV $get('ajax.php?action=pv&from=article&aid=<?=$aid?>'); // 如果页面打开时间超过5秒,则发出统计 (function(){ $.get('ajax.php?action=get5&aid=<?=$aid?>'); },5000); </script> |
后端部分:
发布的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
<?php $action = $_GET['action']; $redis = new Redis(); $redis->connect('127.0.0.1', 6380); // 首页的PV $channelPvIndex = 'pv:index'; //内容页的pv $channelPvList ='pv:list'; // 内容页的PV $channelPvArticle = 'pv:article'; // 内容页浏览超过5秒 $channelGT5 = 'gt5:article'; if('pv' === $action) { $from = $_GET['from']; if('index' === $from) { $redis->publish($channelPVIndex, 1); } else if('list' === $from) { $tid = intval($_GET['tid']); $redis->publish($channelPvList, $tid); } else if('articel' === $_GET['aid']) { $aid = intval($_GET['aid']); $redis->publish($channelPvArticle, $aid); } } else if('gt5' == $action) { $aid = intval($GET['aid']) { $redis->publish($channelGT5, $aid); } } else { // unknown action } |
订阅的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
<?php // 订阅的实现 $redis = new Redis(); $redis->connect('127.0.0.1', 6380); // 首页的PV $channelPvIndex = 'pv:index'; //内容页的pv $channelPvList ='pv:list'; // 内容页的PV $channelPvArticle = 'pv:article'; // 内容页浏览超过5秒 $channelGT5 = 'gt5:article'; // 频道和PV的key的映射 $keyMap = [ $channelPVIndex => 'realtimepv:index', $channelPvList => 'realtimepv:list', $channelPvArticle => 'realtimepv:article' ]; $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); $redis->subscribe( [$channelPVIndex,$channelPvList,$channelPvArticle,$channelGT5], function(Redis $instance, $channel, $message) { // 注意在subscribe的回调中只能够执行订阅、取消订阅、模式订阅、模式取消订阅,无法执行incrBy // 尝试取消订阅命令(证明上面一句话) // $instance->unsubscribe([$channelName]); // 因此要想incryBy只能重新实例化一个redis $redis2 = new Redis(); $redis2->connect('127.0.0.1',6380); global $keyMap; //这里可以使用闭包实现 if(!isset($$keyMap[$channelName])) { $realTimePvKey = $keyMap[$channelName]; // 映射过来 $redis2->incrBy($realTimePvKey, 1); } } ) |
注意在subscribe的回调中只能够执行订阅、取消订阅、模式订阅、模式取消订阅,无法执行incrBy,因此要想incryBy只能重新实例化一个redis
计算质量分:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
<?php // 订阅的实现 $redis = new Redis(); $redis->connect('127.0.0.1', 6380); // 内容页的PV $channelPvArticle = 'pv:article'; // 内容页浏览超过5秒 $channelGT5 = 'gt5:article'; $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); $redis->subscribe([$channelPVIndex,$channelPvList,$channelPvArticle,$channelGT5], function(Redis $instance, $channel, $message){ /** * 使用pv和gt5的数量计算文章的质量分 * 1. 如何计算? = gt5*6 * 2. 以什么形式保存? HASH * gt5: int * score: int */ $redis2 = new Redis(); $redis2->connect('127.0.0.1',6380); global $keyMap; //这里可以使用闭包实现 if('gt5:article' === $channelName) { echo "${channelName}\n"; $key = 'realtimescore:'.intval($message); $res = $redis2->hIncrBy($keym 'gt5', 1); echo "${channelName}\n"; if($res) { $score = $res * 6; $redis2->hSet($key, 'score', $score); echo "{$score}\n"; } else { // 报警 } } } ) |