熟悉 swoole 和 socket 编程的大佬能进来帮忙看看这个问题么? - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
echo404
V2EX    PHP

熟悉 swoole 和 socket 编程的大佬能进来帮忙看看这个问题么?

  •  
  •   echo404 2019-01-29 13:54:10 +08:00 4246 次点击
    这是一个创建于 2452 天前的主题,其中的信息可能已经有所发展或是发生改变。

    1.背景

    我司最近将消息中间件改为了 mqtt,因为 mqtt 的特性(同一个 client_id 不能进行多次连接),导致客户端与 mqtt 服务器之间必须使用一个长连接。一开始我是用队列 while 循环拉取来实现发消息这个操作的。后来闲的蛋疼加上想学点新的东西,就想用 swoole 写个 tcp 服务器,然后就入了坑了....

    2、问题

    现在,我在每个 task 进程中都启动了一个 mqtt 连接,在压测时,这个 mqtt 连接经常会出现 errno=11 (资源不可用)的错误,我查了下相关资料,这个错误出现的原因是因为 mqtt 连接的 socket 写缓冲区满了,消息无法写进写缓冲区。关于这块我很疑惑,task 进程不应该是个单线程的进程么?这样 mqtt 连接一次只处理一个消息,那为什么还会出现写缓冲区满的错误呢? 另外一个问题就是在测试性能时,我用 swoole 写的这个服务器性能竟然和 while 循环拉取队列的性能差不多,差不多都是每秒发送 380 条消息左右。我都要疯了,究竟是我什么地方写的不对,我现在都怀疑人生了!!!!

    3、代码

    <?php namespace swoole; //如果以守护进程启动后,必须使用绝对地址 include_once __DIR__."/mqtt/MessageId.php"; include_once __DIR__."/mqtt/phpMQTT.php"; class TcpServer { private $serv; private $mqtt_config; private $mqtt_connect; const PROCESS_NAME = 'swoole'; public function __construct() { //创建 Server 对象,监听 127.0.0.1:9501 端口 $this->serv = new \swoole_server('0.0.0.0', 9501); //设置属性 $this->serv->set(array( //守护进程化。设置 daemOnize=> 1 时,程序将转入后台作为守护进程运行。长时间运行的服务器端程序必须启用此项。 //启用守护进程后,CWD (当前目录)环境变量的值会发生变更,相对路径的文件读写会出错。PHP 程序中必须使用绝对路径 'daemonize' => true, 'worker_num' => 2, //异步非阻塞代码一般设为 CPU 的 1-4 倍。 'max_request' => 10000, //一个 worker 进程在处理完超过此数值的任务后将自动退出,主要作用是解决 PHP 进程内存溢出问题 'max_conn' => 1024, //进程最大连接数 'task_worker_num' => 20, //Task 进程最大值不得超过 cpu_num*1000,该进程是同步阻塞的,里面不得调用异步 IO 函数 'task_ipc_mode' => 3, //worker 进程与 task 进程之间的通信模式,3 为队列通信并且设置为了争抢模式,使用消息队列通信,如果 Task 进程处理能力低于投递速度,可能会引起 Worker 进程阻塞。 'message_queue_key' => 0x72000100, //指定消息队列的 key 'task_max_request' => 10000, //task 进程最大任务数 'log_file' => '/tmp/swoole.log', //日志文件 'log_level' => 4, //需要记录的错误级别 'tcp_fastopen' => true, //开启 TCP 快连接 )); //监听服务启动事件 $this->serv->on('Start', array($this, 'onStart')); //监听管理进程启动事件 $this->serv->on('ManagerStart', array($this, 'onManagerStart')); //监听工作进程启动事件 $this->serv->on('WorkerStart', array($this, 'onWorkerStart')); //监听工作进程异常退出事件 $this->serv->on('WorkerError', array($this, 'onWorkerError')); //监听工作检测停止事件 $this->serv->on('WorkerStop', array($this, 'onWorkerStop')); //监听连接进入事件 $this->serv->on('Connect', array($this, 'onConnect')); //监听数据接受事件 $this->serv->on('Receive', array($this, 'onReceive')); //监听连接关闭事件 $this->serv->on('Close', array($this, 'onClose')); //监听 task 进程接收任务事件 $this->serv->on('Task', array($this, 'onTask')); //监听 Task 进程完成任务事件 $this->serv->on('Finish', array($this, 'onFinish')); //启动服务器 $this->serv->start(); } //onStart 回调中,仅允许 echo、打印 Log、修改进程名称。不得执行其他操作 public function onStart($serv) { swoole_set_process_name(self::PROCESS_NAME.'_master'); } //在这个回调函数中可以修改管理进程的名称 public function onManagerStart($serv) { swoole_set_process_name(self::PROCESS_NAME.'_manager'); } //此事件在 Worker 进程 /Task 进程启动时发生,这里创建的对象可以在进程生命周期内使用 public function onWorkerStart($serv, $worker_id) { //引入常用函数文件,由于可能会发送更改,所以在 worker 进程开始时引用 include_once __DIR__.'/mqtt/__cron.php'; $jobType = $serv->taskworker ? 'Tasker' : 'Worker'; swoole_set_process_name(self::PROCESS_NAME.'_'.$jobType.'_'.$worker_id); //在 task 进程中启动 mqtt 连接 if ($serv->taskworker) { echo "Task 进程({$worker_id})启动\r\n"; //获取配置 $this->mqtt_cOnfig= get_mqtt_config($worker_id); //连接服务器(这里为了以后能加入多个 mqtt 实例,所以我们将连接放入一个数组中) foreach ($this->mqtt_config as $key=>$item) { $mqtt = get_mqtt($item, 'o2o_mqtt', []); //如果没有连接上 mqtt 服务器,关闭当前进程 if (!$mqtt) { $serv->stop($worker_id, true); } $mqtt_arr[$key] = $mqtt; } $this->mqtt_cOnnect= $mqtt_arr; //30S 发送一次心跳包 $serv->tick(30000, function () use ($serv, $worker_id) { //发送心跳包 foreach ($this->mqtt_config as $key=>$item) { if (!$this->mqtt_connect[$key]['obj']->ping()) { //如果 ping 失败就重新连接 echo "{$item['addr']} ping 失败,退出当前 task 进程($worker_id)\r\n"; $mqtt = get_mqtt($item, 'o2o_mqtt', []); if (!$mqtt) { $serv->stop($worker_id, true); } $this->mqtt_connect[$key] = $mqtt; } } }); } //在 worker 进程判断文件是否更新 if (!$serv->taskworker) { //清除文件状态缓存,这个是为了防止下面 filemtime 从缓存中读取 clearstatcache(); $filemtime = filemtime(__FILE__); //30S 检测一次文件更新 $serv->tick(30000, function () use ($serv, $worker_id, $filemtime) { //检查文件更新 clearstatcache(); //如果文件变化,则重启所有的 work 进程 if ($filemtime != filemtime(__FILE__)) { echo "文件更新,重启所有 woker/task 进程\r\n"; $serv->reload(); } }); } } public function onWorkerError($serv, $worker_id, $worker_pid, $exit_code, $signal) { echo "{$worker_id} Error\r\n"; } //此函数在 Worker 进程中执行 public function onWorkerStop($serv,$worker_id) { //zend_opcache 的 opcache 清理函数,防止某些服务器开启了 opcache opcache_reset(); } //此函数在 Worker 进程中执行 public function onConnect($serv, $fd) { //echo "Client: connect.\n"; } //此函数在 Worker 进程中执行 public function onReceive($serv, $fd, $from_id, $data) { $param['data'] = json_decode($data,true); $param['fd'] = $fd; //向 task 进程投递任务 $serv->task(json_encode($param)); } //此函数在 Task 进程中执行 public function onTask($serv, $task_id, $src_worker_id, $data) { $st = microtime(true); $param = json_decode($data, true); $data = $param['data']; $fd = $param['fd']; $return = ['code' => 2, 'msg' => 'mqtt 消息发送失败']; foreach ($this->mqtt_connect as $key=>$value) { if ($value['minDevNo'] < $data['device_id'] && $value['maxDevNo'] > $data['device_id']) { $res = send_message($value['obj'], $data['mqtt_topic'], $data['message']); if ($res) { $return['code'] = 1; $return['msg'] = 'mqtt 消息发送成功'; echo "接收到数据".$data['message'].', 发往'.$data['mqtt_topic']."成功".time()."\r\n"; }else{ //断线重连 echo "接收到数据".$data['message'].', 发往'.$data['mqtt_topic']."失败,重新连接 mqtt\r\n"; foreach ($this->mqtt_config as $k=>$item) { $mqtt = get_mqtt($item, 'o2o_mqtt', []); if (!$mqtt) { $serv->stop($serv->worker_id, true); } $mqtt_arr[$k] = $mqtt; } $this->mqtt_cOnnect= $mqtt_arr; } } } $res = json_encode($return); $serv->send($fd, $res); $et = microtime(true); echo "任务{$src_worker_id}-{$serv->worker_id}-{$task_id}完成,花费时间".($et-$st)."S\r\n"; return $res; } //此函数在 worker 进程中执行 public function onFinish($serv, $task_id, $data) { //echo "{$task_id}回调完成\r\n"; } public function onClose($server, $fd, $reactorId) { //echo "Client: close.\n"; } } $serv = new TcpServer(); 
    14 条回复    2019-04-03 21:58:00 +08:00
    wo642436249
        1
    wo642436249  
       2019-01-29 16:05:09 +08:00
    干嘛不用协程
    echo404
        2
    echo404  
    OP
       2019-01-29 16:12:46 +08:00
    @wo642436249 刚接触 swoole,异步非阻塞就已经写得很费力了,暂时没有能力去写个协程版的,而且问题应该也不是在这块吧
    puritania
        3
    puritania  
       2019-01-29 16:21:20 +08:00
    所以我选择 golang
    AngryPanda
        4
    AngryPanda  
       2019-01-29 16:22:01 +08:00
    [task 进程不应该是个单线程的进程么?]

    我的理解不是。比如你只开了 2 个 worker,难道只能有 2 个请求被并行处理?那并发数怎么可能上的去呢?
    triptipstop
        5
    triptipstop  
       2019-01-29 16:23:25 +08:00
    写不好 PHP 的才用 Go
    AngryPanda
        6
    AngryPanda  
       2019-01-29 16:27:11 +08:00
    我最近也是第一次用 swoole 来写了一个应用,使用的协程接口。

    性能上的确提升很大,然而写法让人很不习惯。这点比 golang 难用多了。
    echo404
        7
    echo404  
    OP
       2019-01-29 16:34:16 +08:00
    @AngryPanda 测试服务器是我自己的一个 1 核 2G 的小水管,所以按文档中所说,worker 进程是 CPU 数的 1-4 倍,我只开了 2 个进程,每个 worker 进程处理的最大连接数为 1024,2 个进程就同时接收 2048 个请求。超出这个数值之后,如果再有请求去连接这个 TCP 服务器应该会报错误,但是我这边的压测日志中,并没有记录到对应的错误,所以应该还没有到达最大并发量才对
    ferock
        8
    ferock  
    PRO
       2019-01-29 16:37:01 +08:00
    @AngryPanda swoole 研发成本其实并不低,比较起来,还不如用其他语言带来的性能提升来的 “核算”
    AngryPanda
        9
    AngryPanda  
       2019-01-29 16:37:27 +08:00
    @echo404 单线程怎么同时处理这些多连接的请求呢。
    AngryPanda
        10
    AngryPanda  
       2019-01-29 16:39:30 +08:00
    @ferock 的确如此。swoole 的协程还需要配合很多协程客户端来用,这点限制非常大。且和原来的 php 写法差异比较大。
    echo404
        11
    echo404  
    OP
       2019-01-29 16:49:15 +08:00
    @AngryPanda 就我的理解:worker 进程是多线程的,task 进程是单线程的,worker 进程接收到 reactor 进程传递过来的请求之后,将请求投递到 linux 系统自带的队列中去(这个过程是异步的),task 进程就一直读取这个队列中的消息进行处理
    liuxu
        12
    liuxu  
       2019-01-29 17:24:27 +08:00   1
    1 核 2G,不会 1M 带宽吧,压测带宽跑满了吧
    echo404
        13
    echo404  
    OP
       2019-01-30 10:34:25 +08:00
    @liuxu 还真是,把网络这块给忘了
    chdahuzi
        14
    chdahuzi  
       2019-04-03 21:58:00 +08:00
    @AngryPanda swoole4 内置了协程,即便不显示得用协程,每个请求都用到了协程
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5389 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 42ms UTC 06:03 PVG 14:03 LAX 23:03 JFK 02:03
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86