也许这是第一个支持 MQTT v5.0 协议的 PHP library...支持 MQTT 协议 3.1、3.1.1 和 5.0 版本,支持 QoS 0、QoS 1、QoS 2,那么它来了,使用 composer 来安装
composer require simps/mqtt安装成功之后我们来看一下订阅和发布的使用,以 MQTT5.0 为例
订阅
首先应该是订阅,订阅成功之后才能收到对应主题的发布消息,创建一个subscribe.php写入以下内容
include __DIR__ . '/vendor/autoload.php'; use Simps\MQTT\Hex\ReasonCode; use Swoole\Coroutine; use Simps\MQTT\Client; use Simps\MQTT\Types; $config = [ 'host' => 'broker.emqx.io', 'port' => 1883, 'time_out' => 5, 'user_name' => 'user001', 'password' => 'hLXQ9ubnZGzkzf', 'client_id' => Client::genClientID(), 'keep_alive' => 10, 'properties' => [ 'session_expiry_interval' => 60, 'receive_maximum' => 200, 'topic_alias_maximum' => 200, ], 'protocol_level' => 5, ]; Coroutine\run(function () use ($config) { $client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]); while (!$data = $client->connect()) { Coroutine::sleep(3); $client->connect(); } $topics['simps-mqtt/user001/get'] = [ 'qos' => 1, 'no_local' => true, 'retain_as_published' => true, 'retain_handling' => 2, ]; $timeSincePing = time(); $res = $client->subscribe($topics); // 订阅的结果 var_dump($res); while (true) { $buffer = $client->recv(); if ($buffer && $buffer !== true) { $timeSincePing = time(); // 收到的数据包 var_dump($buffer); } if (isset($config['keep_alive']) && $timeSincePing < (time() - $config['keep_alive'])) { $buffer = $client->ping(); if ($buffer) { echo 'send ping success' . PHP_EOL; $timeSincePing = time(); } else { $client->close(); break; } } // QoS1 发布回复 if ($buffer['type'] === Types::PUBLISH && $buffer['qos'] === 1) { $client->send( [ 'type' => Types::PUBACK, 'message_id' => $buffer['message_id'], 'code' => ReasonCode::SUCCESS ] ); } } });执行php subscribe.php,就会得到这样的输出
array(3) { ["type"]=> int(9) ["message_id"]=> int(1) ["codes"]=> array(1) { [0]=> int(1) } }表示订阅成功,codes 对应的是对应订阅主题的 QoS 等级
发布
订阅成功之后,创建一个publish.php来测试发布
include __DIR__ . '/vendor/autoload.php'; use Swoole\Coroutine; use Simps\MQTT\Client; $config = [ 'host' => 'broker.emqx.io', 'port' => 1883, 'time_out' => 5, 'user_name' => 'user002', 'password' => 'adIJS1D482sd', 'client_id' => Client::genClientID(), 'keep_alive' => 20, 'properties' => [ 'session_expiry_interval' => 60, 'receive_maximum' => 200, 'topic_alias_maximum' => 200, ], 'protocol_level' => 5, ]; Coroutine\run(function () use ($config) { $client = new Client($config, ['open_mqtt_protocol' => true, 'package_max_length' => 2 * 1024 * 1024]); while (!$client->connect()) { Coroutine::sleep(3); $client->connect(); } while (true) { $response = $client->publish( 'simps-mqtt/user001/get', '{"time":' . time() . '}', 1, 0, 0, ['topic_alias' => 1] ); var_dump($response); Coroutine::sleep(3); } });代码的意思是每隔 3 秒给订阅的主题simps-mqtt/user001/get发布一次消息
打开一个新的终端窗口,执行php publish.php就会得到输出:
array(4) { ["type"]=> int(4) ["message_id"]=> int(1) ["code"]=> int(0) ["message"]=> string(7) "Success" }这里增加了 message,为了用户可读,不需要去查找对应的 code 含义是什么
返回到订阅的窗口,就会看到所打印的发布信息
array(8) { ["type"]=> int(3) ["topic"]=> string(0) "" ["message"]=> string(19) "{"time":1608017156}" ["dup"]=> int(1) ["qos"]=> int(1) ["retain"]=> int(0) ["message_id"]=> int(4) ["properties"]=> array(1) { ["topic_alias"]=> int(1) } }这样一个简单的发布订阅功能就实现了
在这个库中还有一些值得优化和还未完成的部分,如还没有支持 MQTT5 的Auth type,以及部分的properties还未支持
想参与的同学可以提交 PR,如果有问题也可以提交 Issue,让我们共同去建设 PHP 的生态
推荐学习:php视频教程
以上就是适用于PHP协议解析和协程客户端的是什么的详细内容!
查看更多关于适用于PHP协议解析和协程客户端的是什么的详细内容...
声明:本文来自网络,不代表【好得很程序员自学网】立场,转载请注明出处:http://www.haodehen.cn/did38203
适用于PHP协议解析和协程客户端的是什么
MQTT是一种基于发布/订阅模式的“轻量级”通讯协议,作为一种低开销、低带宽占用的即时通讯协议,已经成为物联网的重要组成部分,今天小编就带大家了解一下simps/mqtt。
阅读:54次