为了更好推送消息、做IM客服,在tp5引入workerman作为常驻内存的工具,经过整合后让workerman很好地融合在tp下面,不用独立部署和独立引入数据库操作类。
由于自己的框架是5.0.*版本,和5.1有区别,所以下载的是"topthink/think-worker": "1.0.*"(根据自己框架选择即可)。
操作步骤:
用composer安装worker(tp5.1以上不用加版本号)
composer require topthink/think-worker=1.0.*
2.用composer安装gateway
composer require workerman/gateway-worker
3.在网站根目录新增目录workman(自定义就行,里面的文件路径加载正常即可)
在workman新增service_all.php,内容如下
#!/usr/bin/env php <?php define('APP_PATH', __DIR__ . '/../application/'); #绑定模块,这是一个总模块,在linux直接启动这个就好了,windows需要用另外的独立打开(用bat那个文件启动也行) define('BIND_MODULE','push/Run'); # 加载框架引导文件 require __DIR__ . '/../thinkphp/start.php';
4.在application下面新增目录push/controller并新增Run.php类,内容如下:
<?php /** * 全部启动功能 * linux 下使用,全部运行功能,对应workerman/service_all.php */ namespace app\push\controller; use GatewayWorker\BusinessWorker; use Workerman\Worker; use GatewayWorker\Gateway; use GatewayWorker\Register; class Run extends Worker { public function __construct() { $this->start_register(); $this->start_business(); $this->start_gateway(); //运行所有Worker; Worker::runAll(); } //开始注册 private function start_register() { $register = new Register('text://0.0.0.0:8001'); $register->name = 'BS_SHOPRegister'; } //初始化 bussinessWorker 进程,注意最后的绑定事件,这样可以融合在自定义的模块 private function start_business() { $worker = new BusinessWorker(); $worker->name = 'BS_SHOPBusinessWorker'; $worker->count = 4; $worker->registerAddress = '127.0.0.1:8001'; //绑定事件监听类 $worker->eventHandler = '\app\push\controller\Events'; //异常日志目录 $worker::$logFile = RUNTIME_PATH.'log/workman.log'; } //初始化gateway private function start_gateway() { $gateway = new Gateway("websocket://0.0.0.0:8002"); $gateway->name = 'BS_SHOPGateway'; $gateway->count = 2; //lanIp是Gateway所在服务器的内网IP,默认填写127.0.0.1即可。多服务器分布式部署的时候需要填写真实的内网ip,不能填写127.0.0.1。注意:lanIp只能填写真实ip,不能填写域名或者其它字符串,无论如何都不能写0.0.0.0 $gateway->lanIp = '127.0.0.1'; $gateway->startPort = 2900; $gateway->registerAddress = '127.0.0.1:8001'; } }
注意:windows下不允许一个php文件运行多个进程,所以我拆开多个start服务在此下面,在workerman绑定多个service同时启动,文件打包在附件中:
5.启动服务(linux),建议先看6,创建好event文件先。
php service_all.php start -d
注意:windows下不允许一个php文件运行多个进程,所以我拆开多个service服务脚本在workerman下面,用bat命令启动多个用于在windows测试,文件打包在附件中。
6.创建event事件接收文件,在application/push/controller新增Events.php类,内容如下:
<?php namespace app\push\controller; use think\Controller; use think\Db; use GatewayWorker\Lib\Gateway; /** * 主要是处理 onConnect onMessage onClose 三个方法 */ class Events extends Controller { /** * 当客户端发来消息时触发 * @param int $client_id 连接id * @param mixed $message 具体消息 */ public static function onMessage($client_id, $message) { $message = @json_decode($message, true); if (empty($message['type'])){ return ; } $type = $message['type']; switch ($type) { case 'start': if (empty($message['bsid'])){ return ; } //需要绑定的ID,可以是用户登录的ID、uid、特殊标识 $bsid = $message['bsid']; //保存 $_SESSION['bs_id'] = $bsid; $_SESSION['bs_name'] = 'BS_SHOP_'.rand(1000,10000); //绑定UID到当前登录的id,方便推送消息 // 将当前链接与uid绑定 Gateway::bindUid($client_id, $bsid); // 通知当前客户端初始化 $message_data = array( 'type' => 'login', 'code' => 0, 'message' =>'success', 'time'=>time(), 'data'=>[ ] ); //给当前登录的id发送消息 // Gateway::sendToClient($client_id, json_encode($message_data,JSON_UNESCAPED_UNICODE)); //给所有在线用户发送消息 /*$message_data = array( 'type' => 'info', 'code' => 0, 'message' =>'欢迎新用户登录:'.$bsid, 'time'=>time(), 'data'=>[ ] ); Gateway::sendToAll(json_encode($message_data,JSON_UNESCAPED_UNICODE));*/ break; return; case 'ping': //心跳检测,返回ok Gateway::sendToClient($client_id, 'pong'); return; case 'test': //随机返回句子 $arr = [ 1 => '机遇对于有准备的头脑有特别的亲和力。', 2 => '不求与人相比,但求超越自己,要哭就哭出激动的泪水,要笑就笑出成长的性格!', 3 => '在你内心深处,还有无穷的潜力,有一天当你回首看时,你就会知道这绝对是真的。', 4 => '无论你觉得自己多么的了不起,也永远有人比你更强;无论你觉得自己多么的不幸,永远有人比你更加不幸。', 5 => '不要浪费你的生命,在你一定会后悔的地方上。', 6 => '放弃该放弃的是无奈,放弃不该放弃的是无能;不放弃该放弃的是无知,不放弃不该放弃的是执着。', 7 => '不要轻易用过去来衡量生活的幸与不幸!每个人的生命都是可以绽放美丽的,只要你珍惜。', 8 => '千万别迷恋网络游戏,要玩就玩好人生这场大游戏。', 9 => '过错是暂时的遗憾,而错过则是永远的遗憾!', 10 => '人生是个圆,有的人走了一辈子也没有走出命运画出的圆圈,其实,圆上的每一个点都有一条腾飞的切线。' ]; $k = array_rand($arr); $message_data = array( 'type' => 'text', 'code' => 0, 'message' =>'success', 'time'=>time(), 'data'=>[ 'content'=>$arr[$k] ] ); //给当前登录的id发送消息 Gateway::sendToClient($client_id, json_encode($message_data,JSON_UNESCAPED_UNICODE)); return; default: Gateway::sendToClient($client_id, json_encode(self::other_logic($message),JSON_UNESCAPED_UNICODE)); return; } return ; } //其它逻辑 private static function other_logic($message) { $data = array( 'type' => 'unknown', 'code' => 0, 'message' =>'未知消息类型', 'time'=>time(), 'data'=>[ 'content' =>'abcccc' ] ); return $data; } /** * 当客户端连接时触发 * @param int $client_id 连接id */ public static function onConnect($client_id) { } /** * 当连接断开时触发的回调函数 * @param $connection */ public static function onClose($client_id) { $message_data = array( 'type' => 'info', 'code' => 0, 'message' =>'用户退出:'.$_SESSION['bs_id'], 'data'=>[ 'time'=>time() ] ); Gateway::sendToAll(json_encode($message_data,JSON_UNESCAPED_UNICODE)); } /** * 当客户端的连接上发生错误时触发 * @param $connection * @param $code * @param $msg */ public static function onError($client_id, $code, $msg) { echo "error $code $msg\r\n"; } /** * 每个进程启动 * @param $worker */ public static function onWorkerStart($worker) { } }
7.新建test.html(任意地方),内容如下:
<!DOCTYPE html> <html lang="zh"> <head> <meta charset="UTF-8"> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>测试wesocket</title> <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script> <style> .abc{ padding: 8px; } </style> </head> <body> <div class="btn-group"> <button class="btn abc">发送消息1</button> <button class="send2" style="padding: 8px;">发送消息2</button> <div class="res"> <p>下面的返回的消息</p> <div class="bs-data" style="color: red;"></div> </div> </div> <script> var ws;//websocket实例 var lockReconnect = false;//避免重复连接 var wsUrl = 'ws:127.0.0.1:8002'; function createWebSocket(url) { try { ws = new WebSocket(url); initEventHandle(); } catch (e) { reconnect(url); } } function initEventHandle() { ws.onclose = function () { reconnect(wsUrl); }; ws.onerror = function () { reconnect(wsUrl); }; ws.onopen = function () { //心跳检测重置 heartCheck.reset().start(); login(); }; ws.onmessage = function (event) { //如果获取到消息,心跳检测重置 //拿到任何消息都说明当前连接是正常的 heartCheck.reset().start(); onmessage(event) } } function reconnect(url) { if(lockReconnect) return; lockReconnect = true; //没连接上会一直重连,设置延迟避免请求过多 setTimeout(function () { createWebSocket(url); lockReconnect = false; }, 2000); } //心跳检测 var heartCheck = { timeout: 10000,//10秒 timeoutObj: null, reset: function(){ clearTimeout(this.timeoutObj); return this; }, start: function(){ this.timeoutObj = setTimeout(function(){ //这里发送一个心跳,后端收到后,返回一个心跳消息, //onmessage拿到返回的心跳就说明连接正常 ws.send('{"type":"ping"}'); }, this.timeout) } } //初始化登录 function login(){ ws.send('{"type":"start","bsid":"bsshoptest"}'); } //消息接收 function onmessage(e){ console.log('收到消息啦',e.data) var data = e.data; if (data==='pong'){ return ; } data = $.parseJSON(data); var content = ''; switch (data.type){ case 'text': content = data.data.content; break; default: content = data.data.content?data.data.content: '未知'; break; } $(".bs-data").html(content) } createWebSocket(wsUrl); $(function (){ $(".btn").on('click',function () { return ws.send('{"type":"test"}'); }); $(".send2").on('click',function () { return ws.send('{"type":"abc"}'); }); }) </script> </body> </html>
8.打开test.html在浏览器测试
附件里面包括了上面截图的核心文件,即workerman目录和push目录下面的文件,不包括composer下载的workerman组件和tp框架。
注意事项:
仅限测试,使用前请自己修改端口、功能逻辑。