实现的消息队列主要是使用了easySwoole的多进程以及Redis的list特性来做的
第一部分 说一下easySwoole进程以及Redis的List
说明
- 插入使用rpush
- 获取使用lrange,这里的0 -1代表获取全部
- 也就是说消息队列里的消息借助于redis的list数据结构,把消息塞进list中,按照先进先出的方式,这样在消费者端就先获取1然后往后获取。其实也可以使用kpop从后面获取,这里我们使用lpop按照先进先出的方式进行消费。
其实easySwoole提供了如何实现队列消费/自定义进程 可以参考文档
第二部分 生产者 消费者端业务编写
这里实现的是将队列中的消息写入到日志和控制台输出,在这个文字可以控制器发邮件、发推送、发短信等等,只要把业务逻辑写到代码中的do your task的位置即可。
App->Lib->Process->ConsumerTest.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
<?php /** * Created by bingxiong. * Date: 12/21/18 * Time: 4:16 PM * Description: */ namespace App\Lib\Process; use EasySwoole\Core\Component\Di; use EasySwoole\Core\Swoole\Process\AbstractProcess; use Swoole\Process; use EasySwoole\Core\Component\Logger; class ConsumerTest extends AbstractProcess { private $isRun = false; public function run(Process $process) { // TODO: Implement run() method. /* * 举例,消费redis中的队列数据 * 定时500ms检测有没有任务,有的话就while死循环执行 * 每500ms执行一遍里面的逻辑 */ $this->addTick(500,function (){ if(!$this->isRun){ $this->isRun = true; $redis = new \redis();//此处为伪代码,请自己建立连接或者维护redis连接 while (true){ try{ $task = Di::getInstance()->get('REDIS')->lPop('bing_list'); if($task){ // do you task // 发送邮件 推送消息 写LOG var_dump($this->getProcessName()."---".$task); Logger::getInstance()->log($this->getProcessName().'--'.$task); }else{ break; } }catch (\Throwable $throwable){ break; } } $this->isRun = false; } // var_dump($this->getProcessName().' task run check'); }); } public function onShutDown() { // TODO: Implement onShutDown() method. } public function onReceive(string $str, ...$args) { // TODO: Implement onReceive() method. } } |
里面有两个redis基类的方法分别是入队列和出队列:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
<?php /** * Created by bingxiong. * Date: 12/21/18 * Time: 12:44 AM * Description: */ namespace App\Lib\Redis; // 使用单例模式 use EasySwoole\Config; use EasySwoole\Core\AbstractInterface\Singleton; class Redis { use Singleton; public $redis = ""; private function __construct() { //判断扩展有没有安装 if(!extension_loaded('redis')){ throw new \Exception("redis.so文件不存在"); } try{ // 从自己的配置加载 $redisConfig = Config::getInstance()->getConf("redis"); $this->redis = new \Redis(); $result = $this->redis->connect($redisConfig['host'],$redisConfig['port'],$redisConfig['time_out']); } catch (\Exception $e){ throw new \Exception("redis服务异常"); } if($result === false){ throw new \Exception("redis连接失败"); } } /** * 重写get友好的返回key不存在的情况 * @param $key * @return bool|string */ public function get($key){ if(empty($key)){ return ''; } return $this->redis->get($key); } /** * 出队列 * @param $key * @return string */ public function lPop($key){ if(empty($key)){ return ''; } return $this->redis->lPop($key); } /** * 入队列 * @param $key * @param $value * @return bool|int|string */ public function rPush($key,$value){ if(empty($key)){ return ''; } return $this->redis->rPush($key,$value); } } |
在EasySwooleEvent.php中开三个进程
1 2 3 4 5 6 7 8 9 10 |
public static function mainServerCreate(ServerManager $server,EventRegister $register): void { Di::getInstance()->set('REDIS',Redis::getInstance()); // 开三个线程 $allNum = 3; for ($i = 0 ;$i < $allNum;$i++){ ProcessManager::getInstance()->addProcess("consumer_test_{$i}",ConsumerTest::class); } } |
说明:
- 首先mainServerCreate这个方法是easySwoole的主服务创建事件,swoole server 起了之后就会执行这个主服务创建事件,在这个事件中可以通过Process Manager这个类的addProcess这个方法添加进程,第一个参数是进程的名字可以自定义,后面的Test::class相当于是要实例化的类。
1 |
ProcessManager<span class="token punctuation">:</span><span class="token punctuation">:</span><span class="token function">getInstance</span><span class="token punctuation">(</span><span class="token punctuation">)</span><span class="token operator">-</span><span class="token operator">></span><span class="token function">addProcess</span><span class="token punctuation">(</span><span class="token single-quoted-string string">'test_process_name'</span><span class="token punctuation">,</span>Test<span class="token punctuation">:</span><span class="token punctuation">:</span><span class="token keyword">class</span><span class="token punctuation">)</span><span class="token punctuation">;</span> |
- 这里的类ConsumerTest继承了一个抽象类,简单的理解就是addProcess就会在进程中执行这个类的方法,把这个方法放入进程中。可以看到这个类中有一个addTick的方法,这代表每500ms就检查一下有没有任务,有的话就while死循环执行。
- Logger::getInstance()->log(‘Log Content’);是easySwoole自带的日志写入方法,写入的日志在Log->default_date.log中
现在拿一个Demo来测试效果
App -> HttpController -> Index.php 这里执行得逻辑是获取GET过来的参数f,如果有这个参数,那么就把这个参数放入Redis的list中(如队列),那么就会在进程中执行写日志和控制台输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
<?php /** * Created by bingxiong. * Date: 12/19/18 * Time: 6:38 PM * Description: */ namespace App\HttpController\Api; use \EasySwoole\Core\Component\Di; class Index extends Base { public function pub(){ $params = $this->request()->getRequestParam(); Di::getInstance()->get("REDIS")->rPush('bing_list',$params['f']); } } |
然后就可以在控制台看到队列处理时和日志中(Log->default_xxxx.log)的输出:
补充:
1.进程和线程的区别
一句概括的总论:进程和线程都是一个时间段的描述,是CPU工作时间段的描述。
明确两个概念概念
- CPU执行程序是飞快的轮流执行:CPU执行速度非常快,其实CPU同时只能够执行一个任务,电脑、手机等设备执行多任务的逻辑其实是在很短的时间内CPU轮流执行,由于时间足够短,所以看起来就执行了多任务。
- CPU要保存程序上下文以便下次轮到是继续上次的任务:由于时间非常短的轮流执行,因此有可能这个程序的任务还没有执行完就要轮到下一个任务了,在这个情况下,切换任务的时候,CPU就会保存程序上下文,这样等下一次轮到这个程序的时候就可以接着执行。
所以执行逻辑是:先加载程序A的上下文,然后开始执行A,保存程序A的上下文,调入下一个要执行的程序B的程序上下文,然后开始执行B,保存程序B的上下文
这样进程和线程的概念就很好理解了:
进程就是包换上下文切换的程序执行时间总和 = CPU加载上下文+CPU执行+CPU保存上下文
进程的颗粒度太大,每次都要有上下的调入,保存,调出。如果我们把进程比喻为一个运行在电脑上的软件,那么一个软件的执行不可能是一条逻辑执行的,必定有多个分支和多个程序段,就好比要实现程序A,实际分成 a,b,c等多个块组合而成。那么这里具体的执行就可能变成:程序A得到CPU =》CPU加载上下文,开始执行程序A的a小段,然后执行A的b小段,然后再执行A的c小段,最后CPU保存A的上下文。这里a,b,c的执行是共享了A的上下文,CPU在执行的时候没有进行上下文切换的。这里的a,b,c就是线程,也就是说线程是共享了进程的上下文环境,的更为细小的CPU时间段。
2.既然可以用addProcess的方式来根据需要新开进程,那在配置文件config.php中设置worker_num开的这些进程拿来有什么作用呢?
reactor相当于nginx这样的服务器 负责接受数据 然后具体处理操作交给worker 如果有些耗时的操作可以交给task worker做异步执行addprocess相当于自定义进程 你可以让这个进程去做你想做的事情 在这个条件下就是做consumer执行消息队列中的任务 比如发邮件 做log等等