接触swoole有一年了,一年前上singwa老师的课,用swoole结合redis实现了一个消息队列,但那个时候是使用TP来做的,TP对swoole的支持并不是特别的友好,在一年后的现在,使用kafka结合easyswoole异步定时任务已经多进程来实现一个高性能的消息队列服务,主要用来实现飞飞物联的设备逻辑(规则引擎),比如根据传感器的数据发短信等等。
首先连接kafka,这里的kaka我使用的百度云提供的kafka服务,自己部署太麻烦而且难以维护,连接的参考例子在这里https://github.com/BCEBIGDATA/kafka-sample-php ,其实最想使用的是微博的那个不使用扩展来连接kafka的库,但是一直没有解决使用ssl文件连接的问题,因此就是用了rdkafka扩展,首先按照样例中的说明安装librdkafka
1 2 3 |
sh setup-librdkafka.sh pecl install rdkafka echo "extension=rdkafka.so" >> /etc/php.ini //根据实际位置 |
这样就安装好了librdkafka和php扩展,要注意的是版本号必须要新一些的,否则使用ssl的会报没有该设置项的异常,排查这个异常花了一晚上的时间。
接下来在easyswoole创建一个连接kafka的基类,在飞飞物联的项目中只会使用到consumer,因为producer的数据是来自天工的设备数据
kafka.php – 连接kafka的基类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
namespace App\Lib\Kafka; use \RdKafka\Conf; use \RdKafka\KafkaConsumer; use Swoole\Exception; class Kafka { private $topic = ''; private $config = [ 'broker' => 'xxxxxxxxx:9092', 'security_protocol' => 'ssl', 'client_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.pem', 'client_key' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/client.key', 'ca_pem' => EASYSWOOLE_ROOT.'/App/Lib/Kafka/ca.pem', 'group_id' => 'kafka-feifei-swoole-consumer' ]; public function __construct($topic) { if(!extension_loaded(rdkafka)){ throw new Exception('rdkafka.so扩展必须开启'); } if(!isset($topic) || empty($topic)){ throw new Exception('kafak实例化必须设置topic'); } $this->topic = $topic; } public function subscribe(){ $conf = new \RdKafka\Conf(); $conf->set('metadata.broker.list', $this->config['broker']); $conf->set('group.id', $this->config['group_id'].rand(0,10)); $conf->set('security.protocol', $this->config['security_protocol']); $conf->set('ssl.certificate.location', $this->config['client_pem']); $conf->set('ssl.key.location', $this->config['client_key']); $conf->set('ssl.ca.location', $this->config['ca_pem']); $consumer = new \RdKafka\KafkaConsumer($conf); $consumer->subscribe([$this->topic]); return $consumer; } } |
这里需要特别注意的是PHPstorm的代码检查器好像找不到rdkafka这个扩展,但是没有关系,我没可以在初始化这个类的时候判断一下扩展是否存在。这里只实现了消费者,要使用消费者需要实例化的时候传入消费者的topic,然后调用subscribe方法,接下来实际在easyswoole的mainServiceCreate中创建三个进程来处理kafka的订阅事件
1 2 3 4 5 6 7 8 9 |
public static function mainServerCreate(EventRegister $register) { // TODO: Implement mainServerCreate() method. // 注册Kafka消费事件, 开三个进程来处理 $allNum = 3; for($i = 0; $i < $allNum; $i++){ ServerManager::getInstance()->getSwooleServer()->addProcess((new Consumer("consumer_{$i}"))->getProcess()); } } |
这里new的Consumer就是处理消费的进程
Consumer.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
<?php /** * Created by bingxiong. * Date: 4/8/19 * Time: 10:22 PM * Description: */ namespace App\Lib\Kafka; use EasySwoole\Component\Process\AbstractProcess; class Consumer extends AbstractProcess { private $isRun = false; public function run($arg) { // 在这里处理kafka连接 // TODO: Implement run() method. $this->addTick(500,function (){ if(!$this->isRun){ $this->isRun = true; // 连接kafka并订阅TOPIC $kafka = new Kafka('xxxxxxxxxxxx');//topic $consumer = $kafka->subscribe(); while(true){ try{ $message = $consumer->consume(120*1000); if($message){ switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo 'process name is'.$this->getProcessName().'\n'; echo "partition:", $message->partition,", offset:", $message->offset,", ", $message->payload, "\n"; break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }else{ break; } }catch (\Throwable $throwable){ break; } } $this->isRun = false; } var_dump($this->getProcessName().' task run check'); }); } public function onShutDown() { // TODO: Implement onShutDown() method. } public function onReceive(string $str, ...$args) { // TODO: Implement onReceive() method. } } |
这里使用了一个异步任务addTick,如果长期没有消息的话也会每500秒去检查一下有没有新的消息。这里还是用了一个死循环,在这里死循环中持续处理消息过来之后的逻辑
现在已经使用swoole+kafka拿到设备的数据了,接下来就是使用异步任务或者异步redis之类的去执行相应的业务逻辑了。