Workerman是什么?
Workerman是一款纯PHP开发的开源高性能的PHP 应用容器。
Workerman不是重复造轮子,它不是一个MVC框架,而是一个更底层更通用的服务框架,你可以用它开发tcp代理、梯子代理、做游戏服务器、邮件服务器、ftp服务器、甚至开发一个php版本的redis、php版本的数据库、php版本的nginx、php版本的php-fpm等等。Workerman可以说是PHP领域的一次创新,让开发者彻底摆脱了PHP只能做WEB的束缚。
实际上Workerman类似一个PHP版本的nginx,核心也是多进程+Epoll+非阻塞IO。Workerman每个进程能维持上万并发连接。由于本身常驻内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能。同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协议以及各种自定义协议。拥有定时器、异步socket客户端、异步Redis、异步Http、异步消息队列等众多高性能组件。
在thinkphp里面使用workerman直接composer 引入即可调用,问题在用如何在workerman里面调用tp的ORM功能,官方示例是用几个start.php启动workerman相关服务,那么在Events.php里面怎样使用tp的ORM呢。下面用tp5.0.7,其它版本的差异自行研究哦。
方法一(命令行模式):
①新建几个tp的命令行(linux下就一个即可,windows下面方便调试就多每个服务独立一个)。
linux里面只要运行ChatAll就可以了。
里面就是调用下面对应的方法,取ChatAll为例(都调用了一次):
<?php namespace app\command\controller; use app\service\ChatWorkerService; use think\console\Command; use think\console\Input; use think\console\Output; use Workerman\Worker; class ChatAll extends Command { public function configure() { $this->setName('ChatAll') ->setDescription('linux运行所有聊天服务') ->addArgument('action', null, null, 'start')//start|stop|restart|reload|status ->addArgument('type', null, null);//d|daemon 守护进程 } public function execute(Input $input, Output $output) { $type = $input->getArgument('type'); // 标记是全局启动 define('GLOBAL_START', 1); ChatWorkerService::Register(); ChatWorkerService::BusinessWorker(); ChatWorkerService::Gateway(); //以守护模式启动 //Worker::$daemonize = true; if ($type == 'd' || $type == 'daemon') { Worker::$daemonize = true; } // 运行所有服务 Worker::runAll(); } }
②新建ChatWorkerService类(按自己的需求就行,我是用于聊天的),里面内容大概这样:
<?php namespace app\service; use app\api\controller\chat\Events; use GatewayWorker\BusinessWorker; use GatewayWorker\Gateway; use GatewayWorker\Register; use Workerman\Worker; class ChatWorkerService { public static function BusinessWorker() { $config = config('worker'); // bussinessWorker 进程 $worker = new BusinessWorker(); // worker名称 $worker->name = 'ChatBusinessWorker'; $worker::$logFile = RUNTIME_PATH. 'worker.log'; $worker::$pidFile = RUNTIME_PATH. 'worker.pid'; // bussinessWorker进程数量 // 服务注册地址 $worker->registerAddress = $config['register_address'] . ':' . $config['register_port']; $worker->eventHandler = Events::class;// 事件处理类 // 如果不是在根目录启动,则运行runAll方法 if (!defined('GLOBAL_START')) { Worker::runAll(); } } public static function Register() { $config = config('worker'); // register 服务必须是text协议 $register = new Register('text://0.0.0.0:' . $config['register_port']); $register->name = 'ChatRegister'; // 如果不是在根目录启动,则运行runAll方法 if (!defined('GLOBAL_START')) { Worker::runAll(); } } public static function Gateway() { $config = config('worker'); // gateway 进程 $gateway = new Gateway(config('worker.gateway_address')); // 设置名称,方便status时查看 $gateway->name = 'ChatGateway'; // 设置进程数,gateway进程数建议与cpu核数相同 $gateway->count = $config['count']; // 分布式部署时请设置成内网ip(非127.0.0.1) $gateway->lanIp = $config['lan_ip']; // 内部通讯起始端口。假如$gateway->count=4,起始端口为2300 // 则一般会使用2300 2301 2302 2303 4个端口作为内部通讯端口 $gateway->startPort = $config['start_port']; // 心跳间隔 $gateway->pingInterval = $config['ping_interval']; $gateway->pingNotResponseLimit = $config['ping_not_response_limit']; // 心跳数据(如果为空则前端需要设置心跳后端遍历是否存在) $gateway->pingData = '{"type":"ping"}'; //$gateway->pingData = json_encode(['type' => 'ping','msg'=>'balala']); // 服务注册地址 $gateway->registerAddress = $config['register_address'].':' . $config['register_port']; // 如果不是在根目录启动,则运行runAll方法 if (!defined('GLOBAL_START')) { Worker::runAll(); } } }
配置文件也提供一下吧:
<?php return [ 'register_port' => 1236, 'register_address' => '127.0.0.1', // Gateway内网地址,cluster模式下设置为0 'lan_ip' => '127.0.0.1', // 本机内网IP,用于网关和业务代码通信 'start_port' => 2300, // 起始端口 'ping_interval' => 10, // 心跳间隔 'ping_not_response_limit' => 1, // 允许的心跳失败次数,超过这个次数会断开连接 'ping_data' => '', // 心跳数据 'count' => 2, // 此业务启动的进程数 'user' => 'www', // 运行user 'group' => 'www', // 运行group 'daemonize' => false, // 是否守护进程化运行 'socket_buffer_size' => 1024 * 1024 * 2, // 单个socket的buffer大小 'gateway_address' => 'Websocket://0.0.0.0:7272',//监听地址 ];
这种方式直接可以使用tp的orm,在Events里面爱怎么查询就怎么查询。
运行模式:
linux下面:php think ChatAll start d (在后台进程方式运行,不加d就是调试模式)
windows:依次执行三个命令行即可(除了ChatALL),为了方便启动我写了个bat脚本:
::@echo off CHCP 65001 :: 检查并启动 ChatBusinessWorker tasklist /FI "IMAGENAME eq php.exe" /FI "WINDOWTITLE eq ChatBusinessWorker" | find /I "php.exe" >nul if errorlevel 1 ( start "ChatBusinessWorker" cmd /k "php think ChatBusinessWorker" ) :: 检查并启动 ChatGateway tasklist /FI "IMAGENAME eq php.exe" /FI "WINDOWTITLE eq ChatGateway" | find /I "php.exe" >nul if errorlevel 1 ( start "ChatGateway" cmd /k "php think ChatGateway" ) :: 检查并启动 ChatRegister tasklist /FI "IMAGENAME eq php.exe" /FI "WINDOWTITLE eq ChatRegister" | find /I "php.exe" >nul if errorlevel 1 ( start "ChatRegister" cmd /k "php think ChatRegister" )
方法二(多个独立start_*.php方式):
这种也是官方的方式,官方没有说怎么调用tp的ORM,我是演示怎么用TP框架ORM的。
①直接下载workerman(你也可以用composer方式安装到tp目录,但是当引入TP的时候其他第三类库可能冲突)放到项目某个目录(我和application同级加上chat目录)
②修改三个start_*.php,里面的配置是测试用到,自己可以定义一个config.php,引入就行了。
<?php use \Workerman\Worker; use \GatewayWorker\BusinessWorker; require_once __DIR__ . '/../vendor/autoload.php'; // bussinessWorker 进程 $worker = new BusinessWorker(); // worker名称 $worker->name = 'ChatBusinessWorker'; // bussinessWorker进程数量 $worker->count = 2; // 服务注册地址 $worker->registerAddress = '127.0.0.1:1239'; !defined('RUNTIME_PATH') && define('RUNTIME_PATH', __DIR__ . '/../../runtime/'); $worker::$logFile = RUNTIME_PATH . 'worker.log'; $worker::$pidFile = RUNTIME_PATH . 'worker.pid'; // 如果不是在根目录启动,则运行runAll方法 if (!defined('GLOBAL_START')) { Worker::runAll(); }
<?php use \Workerman\Worker; use \GatewayWorker\Gateway; require_once __DIR__ . '/../vendor/autoload.php'; // gateway 进程 $gateway = new Gateway("Websocket://0.0.0.0:7272"); // 设置名称,方便status时查看 $gateway->name = 'ChatGateway'; // 设置进程数,gateway进程数建议与cpu核数相同 $gateway->count = 2; // 分布式部署时请设置成内网ip(非127.0.0.1) $gateway->lanIp = '127.0.0.1'; // 内部通讯起始端口。假如$gateway->count=4,起始端口为2300 // 则一般会使用2300 2301 2302 2303 4个端口作为内部通讯端口 $gateway->startPort = 2300; // 心跳间隔 $gateway->pingInterval = 10; $gateway->pingNotResponseLimit = 1; // 心跳数据(如果为空则前端需要设置心跳后端遍历是否存在) $gateway->pingData = '{"type":"ping"}'; // 服务注册地址 $gateway->registerAddress = '127.0.0.1:1236'; // 如果不是在根目录启动,则运行runAll方法 if(!defined('GLOBAL_START')) { Worker::runAll(); }
<?php use \Workerman\Worker; use \GatewayWorker\Register; require_once __DIR__ . '/../vendor/autoload.php'; // register 服务必须是text协议 $register = new Register('text://0.0.0.0:1239'); $register->name='ChatRegister'; // 如果不是在根目录启动,则运行runAll方法 if(!defined('GLOBAL_START')) { Worker::runAll(); }
③修改Events.php,在里面加载tp框架
<?php use think\App; class Events { private static $events; public static function onWorkerStart($worker) { // 初始化ThinkPHP应用环境tp5.0.7(其他tp版本的命令可能有差异哈) define('APP_PATH', __DIR__ . '/../../application/'); define('THINK_PATH', __DIR__ . '/../../thinkphp/'); require __DIR__ . '/../../thinkphp/base.php'; // 执行应用 App::initCommon();//到此即可,千万不要Console::init()!!!; self::$events = new \app\api\controller\chat\Events(); self::$events::onWorkerStart($worker); } public static function onWebSocketConnect($client_id, $data) { self:: $events::onWebSocketConnect($client_id, $data); } public static function onConnect($client_id) { self:: $events::onConnect($client_id); } //可以在这里为每一个businessWorker进程做一些清理工作,例如保存一些重要数据等。 public static function onWorkerStop($businessWorker) { self:: $events::onWorkerStop($businessWorker); } /** * 有消息时 * @param int $client_id * @param mixed $message * @return void|bool * @throws Exception */ public static function onMessage($client_id, $message) { self:: $events::onMessage($client_id, $message); } /** * 当客户端断开连接时 * @param integer $client_id 客户端id */ public static function onClose($client_id) { self:: $events::onClose($client_id); } }
这样在\app\api\controller\chat\Events类里面随意调用tp的ORM啦
这种方式的核心:
public static function onWorkerStart($worker) { // 初始化ThinkPHP应用环境 define('APP_PATH', __DIR__ . '/../../application/'); define('THINK_PATH', __DIR__ . '/../../thinkphp/'); require __DIR__ . '/../../thinkphp/base.php'; // 执行应用 App::initCommon(); self::$events = new \app\api\controller\chat\Events(); self::$events::onWorkerStart($worker); }
启动方式就很简单了:
linux 下面:php start.php start -d
windows下面:直接运行start_for_win.bat
\app\api\controller\chat\Events类:
<?php /** * 用于检测业务代码死循环或者长时间阻塞等问题 * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload * 然后观察一段时间workerman.log看是否有process_timeout异常 */ #declare(ticks=1); /** * 聊天主逻辑 * 主要是处理 onMessage onClose */ namespace app\api\controller\chat; use app\result\PingResult; use Workerman\Timer; //修改了之后要重启一下服务!!!!! //修改了之后要重启一下服务!!!!! //修改了之后要重启一下服务!!!!! class Events { public static function onWorkerStart($worker) { //防止长链接数据库掉链的,30秒ping一次db保持活跃 Timer::add(30, function () use ($worker) { PingResult::ping('chat_service'); }); } public static function onWebSocketConnect($client_id, $data) { //ChatLog::add([$client_id, $data], 'onWebSocketConnect', 1); } public static function onConnect($client_id) { echo '开始连接: ' . $client_id, PHP_EOL; // var_dump($client_id); } //可以在这里为每一个businessWorker进程做一些清理工作,例如保存一些重要数据等。 public static function onWorkerStop($businessWorker) { //清除所有客户端 // echo '$businessWorker'; } /** * 有消息时 * @param int $client_id * @param mixed $message * @return void|bool */ public static function onMessage($client_id, $message) { $messageData = @json_decode($message, true); if (!$messageData) { return; } switch ($messageData['type']) { case 'pong': return; case 'login': break; case 'chat': break; } } /** * 当客户端断开连接时 * @param integer $client_id 客户端id */ public static function onClose($client_id) { } }
第一种方式supervisord 不太友好(可能与tp版本有关系),第二种方式supervisord 相对友好