thinkphp融合workerman的两种简单方式,直接在workerman里面用tp的orm功能

Workerman是什么?

Workerman是一款纯PHP开发的开源高性能的PHP 应用容器。

Workerman不是重复造轮子,它不是一个MVC框架,而是一个更底层更通用的服务框架,你可以用它开发tcp代理、梯子代理、做游戏服务器、邮件服务器、ftp服务器、甚至开发一个php版本的redis、php版本的数据库、php版本的nginx、php版本的php-fpm等等。Workerman可以说是PHP领域的一次创新,让开发者彻底摆脱了PHP只能做WEB的束缚。

实际上Workerman类似一个PHP版本的nginx,核心也是多进程+Epoll+非阻塞IO。Workerman每个进程能维持上万并发连接。由于本身常驻内存,不依赖Apache、nginx、php-fpm这些容器,拥有超高的性能。同时支持TCP、UDP、UNIXSOCKET,支持长连接,支持Websocket、HTTP、WSS、HTTPS等通讯协议以及各种自定义协议。拥有定时器、异步socket客户端、异步Redis、异步Http、异步消息队列等众多高性能组件。


在thinkphp里面使用workerman直接composer 引入即可调用,问题在用如何在workerman里面调用tp的ORM功能,官方示例是用几个start.php启动workerman相关服务,那么在Events.php里面怎样使用tp的ORM呢。下面用tp5.0.7,其它版本的差异自行研究哦。


方法一(命令行模式):

①新建几个tp的命令行(linux下就一个即可,windows下面方便调试就多每个服务独立一个)。

image.png

linux里面只要运行ChatAll就可以了。

里面就是调用下面对应的方法,取ChatAll为例(都调用了一次):

<?php

namespace app\command\controller;

use app\service\ChatWorkerService;
use think\console\Command;
use think\console\Input;
use think\console\Output;
use Workerman\Worker;

class ChatAll extends Command
{
    public function configure()
    {
        $this->setName('ChatAll')
            ->setDescription('linux运行所有聊天服务')
            ->addArgument('action', null, null, 'start')//start|stop|restart|reload|status
            ->addArgument('type', null, null);//d|daemon 守护进程
    }

    public function execute(Input $input, Output $output)
    {
        $type = $input->getArgument('type');
        // 标记是全局启动
        define('GLOBAL_START', 1);
        ChatWorkerService::Register();
        ChatWorkerService::BusinessWorker();
        ChatWorkerService::Gateway();
        //以守护模式启动
        //Worker::$daemonize  = true;
        if ($type == 'd' || $type == 'daemon') {
            Worker::$daemonize  = true;
        }
        // 运行所有服务
        Worker::runAll();

    }

}


②新建ChatWorkerService类(按自己的需求就行,我是用于聊天的),里面内容大概这样:

<?php

namespace app\service;

use app\api\controller\chat\Events;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Workerman\Worker;

class ChatWorkerService
{
    public static function BusinessWorker()
    {
        $config = config('worker');
        // bussinessWorker 进程
        $worker = new BusinessWorker();
        // worker名称
        $worker->name = 'ChatBusinessWorker';
        $worker::$logFile = RUNTIME_PATH. 'worker.log';
        $worker::$pidFile = RUNTIME_PATH. 'worker.pid';
        // bussinessWorker进程数量
        // 服务注册地址
        $worker->registerAddress = $config['register_address'] . ':' . $config['register_port'];
        $worker->eventHandler = Events::class;// 事件处理类
        // 如果不是在根目录启动,则运行runAll方法
        if (!defined('GLOBAL_START')) {
            Worker::runAll();
        }
    }

    public static function Register()
    {
        $config = config('worker');
        // register 服务必须是text协议
        $register = new Register('text://0.0.0.0:' . $config['register_port']);
        $register->name = 'ChatRegister';
        // 如果不是在根目录启动,则运行runAll方法
        if (!defined('GLOBAL_START')) {
            Worker::runAll();
        }

    }

    public static function Gateway()
    {
        $config = config('worker');
        // gateway 进程
        $gateway = new Gateway(config('worker.gateway_address'));
        // 设置名称,方便status时查看
        $gateway->name = 'ChatGateway';
        // 设置进程数,gateway进程数建议与cpu核数相同
        $gateway->count = $config['count'];
        // 分布式部署时请设置成内网ip(非127.0.0.1)
        $gateway->lanIp = $config['lan_ip'];
        // 内部通讯起始端口。假如$gateway->count=4,起始端口为2300
        // 则一般会使用2300 2301 2302 2303 4个端口作为内部通讯端口
        $gateway->startPort = $config['start_port'];

        // 心跳间隔
        $gateway->pingInterval = $config['ping_interval'];

        $gateway->pingNotResponseLimit = $config['ping_not_response_limit'];
        // 心跳数据(如果为空则前端需要设置心跳后端遍历是否存在)
        $gateway->pingData = '{"type":"ping"}';
        //$gateway->pingData = json_encode(['type' => 'ping','msg'=>'balala']);

        // 服务注册地址
        $gateway->registerAddress = $config['register_address'].':' . $config['register_port'];

        // 如果不是在根目录启动,则运行runAll方法
        if (!defined('GLOBAL_START')) {
            Worker::runAll();
        }

    }
}

配置文件也提供一下吧:

<?php
return [
    'register_port'    => 1236,
    'register_address' => '127.0.0.1', // Gateway内网地址,cluster模式下设置为0
    'lan_ip'           => '127.0.0.1',       // 本机内网IP,用于网关和业务代码通信
    'start_port'       => 2300,              // 起始端口
    'ping_interval'    => 10,                // 心跳间隔
    'ping_not_response_limit' => 1,         // 允许的心跳失败次数,超过这个次数会断开连接
    'ping_data'        => '',                // 心跳数据
    'count'            => 2,                 // 此业务启动的进程数
    'user'             => 'www',             // 运行user
    'group'            => 'www',             // 运行group
    'daemonize'        => false,             // 是否守护进程化运行
    'socket_buffer_size' => 1024 * 1024 * 2, // 单个socket的buffer大小
    'gateway_address'    =>  'Websocket://0.0.0.0:7272',//监听地址
];


这种方式直接可以使用tp的orm,在Events里面爱怎么查询就怎么查询。


运行模式:

linux下面:php think ChatAll start d  (在后台进程方式运行,不加d就是调试模式)

windows:依次执行三个命令行即可(除了ChatALL),为了方便启动我写了个bat脚本:

::@echo off
CHCP 65001

:: 检查并启动 ChatBusinessWorker
tasklist /FI "IMAGENAME eq php.exe" /FI "WINDOWTITLE eq ChatBusinessWorker" | find /I "php.exe" >nul
if errorlevel 1 (
    start "ChatBusinessWorker" cmd /k "php think ChatBusinessWorker"
)

:: 检查并启动 ChatGateway
tasklist /FI "IMAGENAME eq php.exe" /FI "WINDOWTITLE eq ChatGateway" | find /I "php.exe" >nul
if errorlevel 1 (
    start "ChatGateway" cmd /k "php think ChatGateway"
)

:: 检查并启动 ChatRegister
tasklist /FI "IMAGENAME eq php.exe" /FI "WINDOWTITLE eq ChatRegister" | find /I "php.exe" >nul
if errorlevel 1 (
    start "ChatRegister" cmd /k "php think ChatRegister"
)


方法二(多个独立start_*.php方式):

这种也是官方的方式,官方没有说怎么调用tp的ORM,我是演示怎么用TP框架ORM的。

直接下载workerman(你也可以用composer方式安装到tp目录,但是当引入TP的时候其他第三类库可能冲突)放到项目某个目录(我和application同级加上chat目录)

image.png

②修改三个start_*.php,里面的配置是测试用到,自己可以定义一个config.php,引入就行了。

<?php

use \Workerman\Worker;
use \GatewayWorker\BusinessWorker;

require_once __DIR__ . '/../vendor/autoload.php';
// bussinessWorker 进程
$worker = new BusinessWorker();
// worker名称
$worker->name = 'ChatBusinessWorker';
// bussinessWorker进程数量
$worker->count = 2;
// 服务注册地址
$worker->registerAddress = '127.0.0.1:1239';
!defined('RUNTIME_PATH') && define('RUNTIME_PATH', __DIR__ . '/../../runtime/');

$worker::$logFile = RUNTIME_PATH . 'worker.log';
$worker::$pidFile = RUNTIME_PATH . 'worker.pid';
// 如果不是在根目录启动,则运行runAll方法
if (!defined('GLOBAL_START')) {
    Worker::runAll();
}
<?php
use \Workerman\Worker;
use \GatewayWorker\Gateway;
require_once __DIR__ . '/../vendor/autoload.php';
// gateway 进程
$gateway = new Gateway("Websocket://0.0.0.0:7272");
// 设置名称,方便status时查看
$gateway->name = 'ChatGateway';
// 设置进程数,gateway进程数建议与cpu核数相同
$gateway->count = 2;
// 分布式部署时请设置成内网ip(非127.0.0.1)
$gateway->lanIp = '127.0.0.1';
// 内部通讯起始端口。假如$gateway->count=4,起始端口为2300
// 则一般会使用2300 2301 2302 2303 4个端口作为内部通讯端口
$gateway->startPort = 2300;

// 心跳间隔
$gateway->pingInterval = 10;
$gateway->pingNotResponseLimit = 1;
// 心跳数据(如果为空则前端需要设置心跳后端遍历是否存在)
$gateway->pingData = '{"type":"ping"}';


// 服务注册地址
$gateway->registerAddress = '127.0.0.1:1236';

// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}
<?php
use \Workerman\Worker;
use \GatewayWorker\Register;
require_once __DIR__ . '/../vendor/autoload.php';
// register 服务必须是text协议
$register = new Register('text://0.0.0.0:1239');
$register->name='ChatRegister';
// 如果不是在根目录启动,则运行runAll方法
if(!defined('GLOBAL_START'))
{
    Worker::runAll();
}


③修改Events.php,在里面加载tp框架

<?php


use think\App;

class Events
{
    private static $events;


    public static function onWorkerStart($worker)
    {
        // 初始化ThinkPHP应用环境tp5.0.7(其他tp版本的命令可能有差异哈)
        define('APP_PATH', __DIR__ . '/../../application/');
        define('THINK_PATH', __DIR__ . '/../../thinkphp/');
        require __DIR__ . '/../../thinkphp/base.php';
        // 执行应用
        App::initCommon();//到此即可,千万不要Console::init()!!!;
        self::$events = new \app\api\controller\chat\Events();
        self::$events::onWorkerStart($worker);
    }



    public static function onWebSocketConnect($client_id, $data)
    {
        self:: $events::onWebSocketConnect($client_id, $data);
    }

    public static function onConnect($client_id)
    {
        self:: $events::onConnect($client_id);
    }

    //可以在这里为每一个businessWorker进程做一些清理工作,例如保存一些重要数据等。
    public static function onWorkerStop($businessWorker)
    {
        self:: $events::onWorkerStop($businessWorker);
    }

    /**
     * 有消息时
     * @param int $client_id
     * @param mixed $message
     * @return void|bool
     * @throws Exception
     */
    public static function onMessage($client_id, $message)
    {
        self:: $events::onMessage($client_id, $message);
    }



    /**
     * 当客户端断开连接时
     * @param integer $client_id 客户端id
     */
    public static function onClose($client_id)
    {
        self:: $events::onClose($client_id);
    }
}


这样在\app\api\controller\chat\Events类里面随意调用tp的ORM啦

这种方式的核心:

public static function onWorkerStart($worker)
{
    // 初始化ThinkPHP应用环境
    define('APP_PATH', __DIR__ . '/../../application/');
    define('THINK_PATH', __DIR__ . '/../../thinkphp/');
    require __DIR__ . '/../../thinkphp/base.php';
    // 执行应用
    App::initCommon();
    self::$events = new \app\api\controller\chat\Events();
    self::$events::onWorkerStart($worker);
}


启动方式就很简单了:

linux 下面:php start.php start -d 

windows下面:直接运行start_for_win.bat


\app\api\controller\chat\Events类:

<?php
/**
 * 用于检测业务代码死循环或者长时间阻塞等问题
 * 如果发现业务卡死,可以将下面declare打开(去掉//注释),并执行php start.php reload
 * 然后观察一段时间workerman.log看是否有process_timeout异常
 */
#declare(ticks=1);

/**
 * 聊天主逻辑
 * 主要是处理 onMessage onClose
 */

namespace app\api\controller\chat;

use app\result\PingResult;
use Workerman\Timer;

//修改了之后要重启一下服务!!!!!
//修改了之后要重启一下服务!!!!!
//修改了之后要重启一下服务!!!!!
class Events
{

    public static function onWorkerStart($worker)
    {
        //防止长链接数据库掉链的,30秒ping一次db保持活跃
        Timer::add(30, function () use ($worker) {
            PingResult::ping('chat_service');
        });
    }


    public static function onWebSocketConnect($client_id, $data)
    {
        //ChatLog::add([$client_id, $data], 'onWebSocketConnect', 1);
    }

    public static function onConnect($client_id)
    {

        echo '开始连接: ' . $client_id, PHP_EOL;
        // var_dump($client_id);
    }

    //可以在这里为每一个businessWorker进程做一些清理工作,例如保存一些重要数据等。
    public static function onWorkerStop($businessWorker)
    {
        //清除所有客户端
        // echo '$businessWorker';
    }


   

    /**
     * 有消息时
     * @param int $client_id
     * @param mixed $message
     * @return void|bool
     */
    public static function onMessage($client_id, $message)
    {
        $messageData = @json_decode($message, true);
        if (!$messageData) {
            return;
        }
        switch ($messageData['type']) {
            case 'pong':
                return;
            case 'login':
                break;
            case 'chat':
                break;
        }
    }


 
    /**
     * 当客户端断开连接时
     * @param integer $client_id 客户端id
     */
    public static function onClose($client_id)
    {
        
    }

}


第一种方式supervisord 不太友好(可能与tp版本有关系),第二种方式supervisord 相对友好

评论/留言