为了更好推送消息、做IM客服,在tp5引入workerman作为常驻内存的工具,经过整合后让workerman很好地融合在tp下面,不用独立部署和独立引入数据库操作类。
由于自己的框架是5.0.*版本,和5.1有区别,所以下载的是"topthink/think-worker": "1.0.*"(根据自己框架选择即可)。
操作步骤:
用composer安装worker(tp5.1以上不用加版本号)
composer require topthink/think-worker=1.0.*
2.用composer安装gateway
composer require workerman/gateway-worker
3.在网站根目录新增目录workman(自定义就行,里面的文件路径加载正常即可)
在workman新增service_all.php,内容如下
#!/usr/bin/env php
<?php
define('APP_PATH', __DIR__ . '/../application/');
#绑定模块,这是一个总模块,在linux直接启动这个就好了,windows需要用另外的独立打开(用bat那个文件启动也行)
define('BIND_MODULE','push/Run');
# 加载框架引导文件
require __DIR__ . '/../thinkphp/start.php';4.在application下面新增目录push/controller并新增Run.php类,内容如下:
<?php
/**
* 全部启动功能
* linux 下使用,全部运行功能,对应workerman/service_all.php
*/
namespace app\push\controller;
use GatewayWorker\BusinessWorker;
use Workerman\Worker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
class Run extends Worker
{
public function __construct()
{
$this->start_register();
$this->start_business();
$this->start_gateway();
//运行所有Worker;
Worker::runAll();
}
//开始注册
private function start_register()
{
$register = new Register('text://0.0.0.0:8001');
$register->name = 'BS_SHOPRegister';
}
//初始化 bussinessWorker 进程,注意最后的绑定事件,这样可以融合在自定义的模块
private function start_business()
{
$worker = new BusinessWorker();
$worker->name = 'BS_SHOPBusinessWorker';
$worker->count = 4;
$worker->registerAddress = '127.0.0.1:8001';
//绑定事件监听类
$worker->eventHandler = '\app\push\controller\Events';
//异常日志目录
$worker::$logFile = RUNTIME_PATH.'log/workman.log';
}
//初始化gateway
private function start_gateway()
{
$gateway = new Gateway("websocket://0.0.0.0:8002");
$gateway->name = 'BS_SHOPGateway';
$gateway->count = 2;
//lanIp是Gateway所在服务器的内网IP,默认填写127.0.0.1即可。多服务器分布式部署的时候需要填写真实的内网ip,不能填写127.0.0.1。注意:lanIp只能填写真实ip,不能填写域名或者其它字符串,无论如何都不能写0.0.0.0
$gateway->lanIp = '127.0.0.1';
$gateway->startPort = 2900;
$gateway->registerAddress = '127.0.0.1:8001';
}
}注意:windows下不允许一个php文件运行多个进程,所以我拆开多个start服务在此下面,在workerman绑定多个service同时启动,文件打包在附件中:

5.启动服务(linux),建议先看6,创建好event文件先。
php service_all.php start -d
注意:windows下不允许一个php文件运行多个进程,所以我拆开多个service服务脚本在workerman下面,用bat命令启动多个用于在windows测试,文件打包在附件中。

6.创建event事件接收文件,在application/push/controller新增Events.php类,内容如下:
<?php
namespace app\push\controller;
use think\Controller;
use think\Db;
use GatewayWorker\Lib\Gateway;
/**
* 主要是处理 onConnect onMessage onClose 三个方法
*/
class Events extends Controller
{
/**
* 当客户端发来消息时触发
* @param int $client_id 连接id
* @param mixed $message 具体消息
*/
public static function onMessage($client_id, $message)
{
$message = @json_decode($message, true);
if (empty($message['type'])){
return ;
}
$type = $message['type'];
switch ($type) {
case 'start':
if (empty($message['bsid'])){
return ;
}
//需要绑定的ID,可以是用户登录的ID、uid、特殊标识
$bsid = $message['bsid'];
//保存
$_SESSION['bs_id'] = $bsid;
$_SESSION['bs_name'] = 'BS_SHOP_'.rand(1000,10000);
//绑定UID到当前登录的id,方便推送消息
// 将当前链接与uid绑定
Gateway::bindUid($client_id, $bsid);
// 通知当前客户端初始化
$message_data = array(
'type' => 'login',
'code' => 0,
'message' =>'success',
'time'=>time(),
'data'=>[
]
);
//给当前登录的id发送消息
// Gateway::sendToClient($client_id, json_encode($message_data,JSON_UNESCAPED_UNICODE));
//给所有在线用户发送消息
/*$message_data = array(
'type' => 'info',
'code' => 0,
'message' =>'欢迎新用户登录:'.$bsid,
'time'=>time(),
'data'=>[
]
);
Gateway::sendToAll(json_encode($message_data,JSON_UNESCAPED_UNICODE));*/
break;
return;
case 'ping':
//心跳检测,返回ok
Gateway::sendToClient($client_id, 'pong');
return;
case 'test':
//随机返回句子
$arr = [
1 => '机遇对于有准备的头脑有特别的亲和力。', 2 => '不求与人相比,但求超越自己,要哭就哭出激动的泪水,要笑就笑出成长的性格!', 3 => '在你内心深处,还有无穷的潜力,有一天当你回首看时,你就会知道这绝对是真的。', 4 => '无论你觉得自己多么的了不起,也永远有人比你更强;无论你觉得自己多么的不幸,永远有人比你更加不幸。', 5 => '不要浪费你的生命,在你一定会后悔的地方上。', 6 => '放弃该放弃的是无奈,放弃不该放弃的是无能;不放弃该放弃的是无知,不放弃不该放弃的是执着。', 7 => '不要轻易用过去来衡量生活的幸与不幸!每个人的生命都是可以绽放美丽的,只要你珍惜。', 8 => '千万别迷恋网络游戏,要玩就玩好人生这场大游戏。', 9 => '过错是暂时的遗憾,而错过则是永远的遗憾!', 10 => '人生是个圆,有的人走了一辈子也没有走出命运画出的圆圈,其实,圆上的每一个点都有一条腾飞的切线。'
];
$k = array_rand($arr);
$message_data = array(
'type' => 'text',
'code' => 0,
'message' =>'success',
'time'=>time(),
'data'=>[
'content'=>$arr[$k]
]
);
//给当前登录的id发送消息
Gateway::sendToClient($client_id, json_encode($message_data,JSON_UNESCAPED_UNICODE));
return;
default:
Gateway::sendToClient($client_id, json_encode(self::other_logic($message),JSON_UNESCAPED_UNICODE));
return;
}
return ;
}
//其它逻辑
private static function other_logic($message)
{
$data = array(
'type' => 'unknown',
'code' => 0,
'message' =>'未知消息类型',
'time'=>time(),
'data'=>[
'content' =>'abcccc'
]
);
return $data;
}
/**
* 当客户端连接时触发
* @param int $client_id 连接id
*/
public static function onConnect($client_id)
{
}
/**
* 当连接断开时触发的回调函数
* @param $connection
*/
public static function onClose($client_id)
{
$message_data = array(
'type' => 'info',
'code' => 0,
'message' =>'用户退出:'.$_SESSION['bs_id'],
'data'=>[
'time'=>time()
]
);
Gateway::sendToAll(json_encode($message_data,JSON_UNESCAPED_UNICODE));
}
/**
* 当客户端的连接上发生错误时触发
* @param $connection
* @param $code
* @param $msg
*/
public static function onError($client_id, $code, $msg)
{
echo "error $code $msg\r\n";
}
/**
* 每个进程启动
* @param $worker
*/
public static function onWorkerStart($worker)
{
}
}7.新建test.html(任意地方),内容如下:
<!DOCTYPE html>
<html lang="zh">
<head>
<meta charset="UTF-8">
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<title>测试wesocket</title>
<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
<style>
.abc{
padding: 8px;
}
</style>
</head>
<body>
<div class="btn-group">
<button class="btn abc">发送消息1</button>
<button class="send2" style="padding: 8px;">发送消息2</button>
<div class="res">
<p>下面的返回的消息</p>
<div class="bs-data" style="color: red;"></div>
</div>
</div>
<script>
var ws;//websocket实例
var lockReconnect = false;//避免重复连接
var wsUrl = 'ws:127.0.0.1:8002';
function createWebSocket(url) {
try {
ws = new WebSocket(url);
initEventHandle();
} catch (e) {
reconnect(url);
}
}
function initEventHandle() {
ws.onclose = function () {
reconnect(wsUrl);
};
ws.onerror = function () {
reconnect(wsUrl);
};
ws.onopen = function () {
//心跳检测重置
heartCheck.reset().start();
login();
};
ws.onmessage = function (event) {
//如果获取到消息,心跳检测重置
//拿到任何消息都说明当前连接是正常的
heartCheck.reset().start();
onmessage(event)
}
}
function reconnect(url) {
if(lockReconnect) return;
lockReconnect = true;
//没连接上会一直重连,设置延迟避免请求过多
setTimeout(function () {
createWebSocket(url);
lockReconnect = false;
}, 2000);
}
//心跳检测
var heartCheck = {
timeout: 10000,//10秒
timeoutObj: null,
reset: function(){
clearTimeout(this.timeoutObj);
return this;
},
start: function(){
this.timeoutObj = setTimeout(function(){
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
ws.send('{"type":"ping"}');
}, this.timeout)
}
}
//初始化登录
function login(){
ws.send('{"type":"start","bsid":"bsshoptest"}');
}
//消息接收
function onmessage(e){
console.log('收到消息啦',e.data)
var data = e.data;
if (data==='pong'){
return ;
}
data = $.parseJSON(data);
var content = '';
switch (data.type){
case 'text':
content = data.data.content;
break;
default:
content = data.data.content?data.data.content: '未知';
break;
}
$(".bs-data").html(content)
}
createWebSocket(wsUrl);
$(function (){
$(".btn").on('click',function () {
return ws.send('{"type":"test"}');
});
$(".send2").on('click',function () {
return ws.send('{"type":"abc"}');
});
})
</script>
</body>
</html>8.打开test.html在浏览器测试

附件里面包括了上面截图的核心文件,即workerman目录和push目录下面的文件,不包括composer下载的workerman组件和tp框架。
注意事项:
仅限测试,使用前请自己修改端口、功能逻辑。