实现方案
实现的功能是图文直播的形式,直播员post一条数据,在相应客户端无刷显示出来,这里我们使用websocket来实现这个功能,使用异步任务来进行redis的存储以及push数据,也就是说这个websocket的后端实现是以异步Task的方式来进行,这是提高性能的关键。注意,这里只是核心实现的思路以及关键的代码,还有很多需要优化的点,如:
- 球队的数据应该使用mysql查询,这里是写死的。
- 重启服务的时候应该清楚掉redis的写入的clients,虽然不影响使用但是会尝试连接之前存储的clients id而出现报错。需要在ws.php的构造方法中查询redis存储客户端id的有序集合中是否有值,有值的或要把值删掉。
- 每场球赛的直播的之前的数据应该从数据库中提取出来,这里没有对之前的直播数据进行存储。
- 加入心跳来优化客户端断开连接的情况特别是移动端。
- 还有很多需要优化的点,这个实现的是核心的部分,其他的都不是大问题。
实现的具体思路:每次客户端连接的时候开启ws的时候把用户的连接的id记录在redis的有序集合中,直播员发布消息的时候进入异步任务,异步任务(高性能的关键)具体做两件事情,一是在redis的有序集合中查出来有哪些客户端连接了,找到连接的客户端之后通过拿到的连接id把数据通过WebSocket来push给页面,当用户断开连接的时候把这个连接的id从redis中有序集合中删除。注
意要需要在WebSocket服务端的添加一个构造方法,来在每次服务器重启的时候查询有序集合中是否有值,有值的话需要把值相应的删除,虽然我们在客户端断开连接的时候删除了客户端的连接id,但是如果服务终止,那么之前连接的WebSocket的用户连接id不会被删除,会导致再次push数据的时候服务端报错,因为服务端这个时候redis有序列表中还有用户的连接id,但是这个时候这些脏数据指向的客户端并实际上并没有连接。
part1 支持WebSocket
首先明确一点,WebSocket是建立在HTTP之下的,也就是说我们可以在HTTP中添加WebSocket。
在直播的场景中,我们需要使用websocket服务来使客户端在没有刷新的情况下显示数据。websocket这个机制是基于的http server的,因此如果起一个websocket的脚本它也会具有http的特性,这样就可以使用ws:localhost:8811来使用ws的协议了,并且http服务也是正常可以使用的,因此在之前的http server的基础上增加ws的支持。
ws.php
说明:
- 使用swoole开启websocket的同时也会开启Http,因此可以直接使用来替代之前的纯http server
- 注意document_root’ => “/Users/bingxiong/swoole/hdtocs/thinkphp/public/static”,定义了根目录,也就是说资源必须要在这里的子目录才能够访问
- onTask()这里做的是异步任务的分发机制,从而实现代码复用。
- 在onOpen的时候,也就是客户端打开的时候把客户端连接的id存储在redis中为了push数据给客户端的时候用,也就是我们需要知道有哪些客户端连接了我们的服务器,存在一个有序集合中。
- 在onClose的时候,把客户端的id从有序集合中删掉
- redis有序集合的成员数是2的32次方-1,不用担心数据库爆了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
<?php /** * Created by bingxiong. * Date: 5/19/18 * Time: 7:43 PM * Description: */ class Ws{ CONST HOST="0.0.0.0"; CONST PORT=8811; public $ws = null; public function __construct() { // 重启的时候要获取sMembers获取看看key有没有值,有的话要删掉 $this->ws = new swoole_websocket_server(self::HOST,self::PORT); $this->ws->set([ 'enable_static_handler' => true, // 'document_root' => "/Users/bingxiong/swoole/hdtocs/thinkphp/public/static", 'worker_num' => 4, 'task_worker_num' => 4, ]); $this->ws->on("workerstart",[$this,'onWorkerStart']); $this->ws->on("request",[$this,'onRequest']); $this->ws->on("open",[$this,'onOpen']); $this->ws->on("message",[$this,'onMessage']); $this->ws->on("task",[$this,'onTask']); $this->ws->on("finish",[$this,'onFinish']); $this->ws->on("close",[$this,'onClose']); $this->ws->start(); } /** * onWorkerStart的回调 * 加载框架文件 * @param $server * @param $worker_id */ public function onWorkerStart($server, $worker_id){ // 定义应用目录 define('APP_PATH', __DIR__ . '/../application/'); // 加载框架引导文件 require __DIR__ . '/../thinkphp/start.php'; } /** * request的回调 * @param $request * @param $response */ public function onRequest($request,$response){ // 将swoole中一些特别的用法装换成原生的php $_SERVER =[]; if(isset($request->server)){ foreach ($request->server as $k => $v){ $_SERVER[strtoupper($k)] = $v; } } $_GET = []; if(isset($request->get)){ foreach ($request->get as $k => $v){ $_GET[$k] = $v; } } $_FILES = []; if(isset($request->files)){ foreach ($request->files as $k => $v){ $_FILES[$k] = $v; } } $_POST = []; if(isset($request -> post)){ foreach ($request->server as $k => $v){ $_POST[$k] = $v; } } $_POST['http_server'] = $this->ws; // 执行框架中的内容 ob_start(); try { think\Container::get('app', [APP_PATH]) ->run() ->send(); }catch (\Exception $e){ // todo } $res = ob_get_contents(); ob_end_clean(); $response->end($res); } /** * @param $serv * @param $taskId * @param $workerId * @param $data * @return string */ public function onTask($serv,$taskId,$workerId,$data){ // 分发task任务机制,让不同的任务走不通的逻辑 $obj = new app\common\lib\task\Task; $method = $data['method']; $flag = $obj -> $method($data['data'],$serv); return $flag; // 告诉worker进程 } /** * @param $serv * @param $taskId * @param $data */ public function onFinish($serv,$taskId,$data){ echo "taskId:{$taskId}\n"; echo "finish-data-success:{$data}\n"; } /** * 监听ws打开事件 * @param $ws * @param $request */ public function onOpen($ws,$request){ // fd放到redis中 [1,2] \app\common\lib\redis\Predis::getInstance()->sAdd(config('redis.live_game_key'),$request->fd); var_dump($request->fd); } /** * 监听ws消息事件 * @param $ws * @param $frame */ public function onMessage($ws,$frame){ echo "server-push-message:{$frame->data}\n"; $ws->push($frame->fd,"server-push:".date("Y-m-d H:i:s")); } /** * 监听关闭事件 * @param $ws * @param $fd */ public function onClose($ws,$fd){ // fd 删除 \app\common\lib\redis\Predis::getInstance()->sRem(config('redis.live_game_key'),$fd); echo "clientId:{$fd}\n"; } } // 直接new来开启服务 new Ws(); |
Part 2 拼装数据
Live.php
- 这里还需要加入tocken
- 数据是写死的,这里进一步的优化应该使用mysql查出来
- 拼接好的数据放入$taskData中,注意这里是有一个method来做之前提到的异步任务分发
- 拼装好数据之后把数据发送给异步任务来执行WebSocket和redis入库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
<?php /** * Created by bingxiong. * Date: 5/22/18 * Time: 9:46 PM * Description: */ namespace app\admin\controller; use app\common\lib\Util; use app\common\lib\redis\Predis; class Live { public function push(){ if(empty($_GET)){ return Util::show(config('code.error'),'error'); } // token md5(content) $teams = [ 1 => [ 'name' => '马刺', 'logo' => '/live/imgs/team1.png', ], 4 => [ 'name' => '火箭', 'logo' => '/live/imgs/team2.png', ], ]; $data = [ 'type' => intval($_GET['type']), 'title' => !empty($teams[$_GET['team_id']]['name']) ?$teams[$_GET['team_id']]['name'] : '直播员', 'logo' => !empty($teams[$_GET['team_id']]['logo']) ?$teams[$_GET['team_id']]['logo'] : '', 'content' => !empty($_GET['content']) ? $_GET['content'] : '', 'image' => !empty($_GET['image']) ? $_GET['image'] : '', ]; // 赛况数据使用Task异步任务入库redis $taskData = [ 'method' => 'pushLive', 'data' => $data ]; $_POST['http_server']->task($taskData); return Util::show(config('code.success','ok')); } public function sMembers($key){ return $this->redis->sMembers($key); } } |
Part 3 WebScoket的JavaScript
live.js
- 注意是在onMessage中调用push
- 注意WebSocket的地址ws://127.0.0.1:8811
- push()就是接收传过来的data然后拼接html给前端页面
- 注意push的调用push(evt.data);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
var wsUrl="ws://127.0.0.1:8811"; var websocket = new WebSocket(wsUrl); // 实例化对象的onopen属性 websocket.onopen = function(evt){ //发送一个数据 console.log("Swoole connected success!"); } // 实例化onmessage websocket.onmessage = function (evt) { //在这里调用push push(evt.data); console.log("ws-server-return-data:"+evt.data); } // 实例化onclose websocket.onclose = function (evt) { console.log("close"); } // 实例化onerror websocket.onerror = function(evt, e){ console.log("error:"+evt.data); } function push(data) { data = JSON.parse(data); //拼接html html = '<div class="frame">'; html += '<h3 class="frame-header">'; html += '<i class="icon iconfont icon-shijian"></i>第'+data.type+'节 01:30'; html += '</h3>'; html += '<div class="frame-item">'; html += '<span class="frame-dot"></span>'; html += '<div class="frame-item-author">'; if(data.logo) { html += '<img src="'+data.logo+'" width="20px" height="20px" />'; } html += data.title; html += '</div>'; html += '<p>'+data.content+'</p>'; html += '</div>'; html += '</div>'; $('#match-result').prepend(html); } |
part 4 异步任务
Task.php
- 异步任务通过method都分发到这里
- SendSms()方法是之前异步登录异步redis存储验证码的
- sMembers()方法是通过有序集合的形式来获取已经连上的客户端id
- 然后pushLive()直接把数据push给所有用户
- 其实swoole_server文档中有一个count可以直接获取到连接的客户端id(遍历),这里没有使用这种方式
- 这里是把连接的id放在redis里面,push数据到客户端的时候在把已经连接的id拿出来push
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
<?php /** * 代表的是 swoole里面后续的所有任务都放到这里来做 * Created by bingxiong. * Date: 5/19/18 * Time: 8:33 PM * Description: */ namespace app\common\lib\Task; use app\common\lib\ali\Sms; use app\common\lib\redis\Predis; use app\common\lib\Redis; class Task{ /** * @param $data * @param $sevr * @return bool */ public function sendSms($data,$sevr){ try{ $response = Sms::sendSms($data['phone'],$data['code']); }catch (\Exception $e) { return false; } // 如果发送成功吧验证码记录到redis里面 if($response->Code === "OK"){ Predis::getInstance()->set(Redis::smsKey($data['phone']),$data['code'],config('redis.out_time')); }else{ return false; } return true; } /** * @param $data * @param $serv swoole server 对象 */ public function pushLive($data,$serv){ // 获取所有用户 $clients = Predis::getInstance()->sMembers(config("redis.live_game_key")); print_r($clients); // 把数据POST给客户端,客户端使用websocket发送给在线用户 foreach ($clients as $fd){ $serv->push($fd,json_encode($data)); } } } |
Part 5 Predis中的方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
/** * @param $key * @param $value * @return int */ public function sAdd($key,$value){ return $this->redis->sAdd($key,$value); } /** * @param $key * @param $value * @return int */ public function sRem($key,$value){ return $this->redis->sRem($key,$value); } /** * @param $key * @return array */ public function sMembers($key){ return $this->redis->sMembers($key); } |