首页 > php > lumen中用swoole实现RPC服务
2018
06-17

lumen中用swoole实现RPC服务

背景


因老项目众多,既有原生PHP也有TP3.2还有其他一些乱七八糟的项目,导致维护困难,经常同一份代码要写到好几个项目中,copy来copy去,也经常发生改了一个地方,其他地方没改全的问题,因为我们决定做一个小型的微服务项目,通过RPC调用,让后续的项目维护变得简单和高效


可能遇到的问题和风险


  1. 作为基础项目,如果发生宕机或者其他问题,会导致调用的其他项目都发生问题

    因此,他的高可用性就非常重要,应对方案如下:

    我们暂时采用内网负载均衡,搭配多台service来保障高可用

   2. 当该项目响应比较慢时,可能拖累其他项目都变慢

    应对方案如下:

   调用该项目的其他项目,都加上调用的超时时间,如果发现超时或者响应失败,则立刻报警

   服务的访问采用内网的负载均衡,提高响应效率


实战

我们这个项目基于已有的lumen项目,已经集成了laravel-s (一套适配lumen+swoole的开源类库) 

项目地址:  https://github.com/hhxsv5/laravel-s

如何集成: /?id=195

因此我们写一套基于TCP的RPC服务,来给其他项目调用


配置文件修改 config/laravels.php

 'sockets' => [
        [
            'host' => '127.0.0.1',
            'port' => 1314,
            'type' => SWOOLE_SOCK_TCP,// 支持的嵌套字类型:https://wiki.swoole.com/#/consts?id=socket-%e7%b1%bb%e5%9e%8b
            'settings' => [// Swoole可用的配置项:https://wiki.swoole.com/#/server/port?id=%e5%8f%af%e9%80%89%e5%8f%82%e6%95%b0
//                'open_eof_check' => true,
//                'package_eof' => "\r\n",
                'open_length_check' => true,
                'package_max_length' => 81920,
                'package_length_type' => 'N', //see php pack()
                'package_length_offset' => 0,
                'package_body_offset' => 4,
            ],
            'handler' => \App\Http\Controllers\TcpServer::class,
            'enable' => true, // 是否启用,默认为true
        ],
    ],


app/Http/Controllers/TcpServer.php

<?php

namespace App\Http\Controllers;

use Hhxsv5\LaravelS\Swoole\Socket\TcpSocket;
use Swoole\Server;
use App\Rpc\Services\RpcService as RpcService;

class TcpServer extends TcpSocket
{
    public function onConnect(Server $server, $fd, $reactorId)
    {
        \Log::info('New TCP connection', [$fd]);
    }

    public function onReceive(Server $server, $fd, $reactorId, $data)
    {
        \Log::info('Received data', [$fd, $data]);
        $length = unpack("N", $data)[1];
        $msg = substr($data, -$length);
        $msg = json_decode($msg, 1);
        if (!empty($msg)) {
            if (!isset($msg['service']) || !isset($msg['function'])) {
                $server->send($fd, ['error_msg' => 'service or function is required']);
            }
            $result = RpcService::handler($msg);
            \Log::info('result', [$result]);
            $data = json_encode($result);
            $data = pack("N", strlen($data)) . $data;
            $server->send($fd, $data);
        }
    }

    public function onClose(Server $server, $fd, $reactorId)
    {
        \Log::info('Close TCP connection', [$fd]);
        $server->send($fd, 'Goodbye');
    }
}


app/Rpc/Services/RpcService.php

<?php

namespace App\Rpc\Services;

class RpcService
{

    public function __construct()
    {

    }

    /**
     * @service:TestService
     * @function:test
     * @params  参数
     */
    public static function handler($params)
    {
        $service = self::getService($params['service']);
        $serviceMod = new $service;
        $function = $params['function'];
        $args = $params['params'];
        return $serviceMod->$function($args);
    }

    public static function getService($name)
    {
        return $service = __NAMESPACE__ . '\\' . $name;
    }

}


app/Rpc/Services/TestService.php

<?php

namespace App\Rpc\Services;

use Agp\Models\AudioLive;

class TestService
{

    public function __construct()
    {

    }

    public function test($params)
    {
        $service = new AudioLive();
        $result = $service->find(1);
        return $result;
    }

}

代码部署好以后,重启服务,因为我们已经配置了常驻进程,执行如下命令重启

supervisorctl restart api

supervisor配置如下:

[program:api]
command = /usr/local/php/bin/php /data/nginx/wwwbeta/api.aigupiao.com/bin/larave
ls start
directory = /data/nginx/wwwbeta/api.aigupiao.com
numprocs = 1
process_name = %(program_name)s
user= www
autostart = false
autorestart = true
redirect_stderr = true
stdout_logfile = /data/nginx/logs/api.log
stdout_logfile_maxbytes = 50MB
loglevel = info

查看端口监听情况,监听成功则说明启动成功

netstat -lnp |grep 1314

客户端代码 rpc.php

<?php

class RpcClient
{
        protected $client = null;
        protected $host = '127.0.0.1';
        protected $port = 1314;
        protected $package_length_type = 'N';
        protected $package_body_offset = 4;
        protected $package_max_length = 81920;
        protected $service = null;

        public function __construct($service)
        {
                $this->service = $service;
        }

        public function __call($function,$params)
        {
                // 创建一个客户端
                if(empty($this->client)){
                        $this->client = stream_socket_client($this->host.":".$this->port,$errno,$errstr,1);
                        if (!$this->client) {
                                exit("error on stream_socket_client: [$errno] $errstr");
                        }

                }

                // 向服务端发送我们自定义的协议数据
                $proto = [];
                $proto['service'] = $this->service;
                $proto['function'] = $function;
                $proto['params'] = $params;
                $proto = json_encode($proto);
                $msg_length = pack("N" , strlen($proto) ). $proto;
                stream_set_timeout($this->client,1);
                fwrite($this->client, $msg_length);
                // 读取服务端传来的数据

                $buf = fread($this->client, $this->package_max_length);
                $length = unpack($this->package_length_type, $buf)[1];
                $left = $length+$this->package_body_offset-$this->package_max_length;
                if($left>0){
                        $buf .= fread($this->client, $left);
                }
                $data = substr($buf, -$length);
                $info = stream_get_meta_data($this->client);
                fclose($this->client);
                if ($info['timed_out']) {
                        exit("time out");
                }
                return $data;
        }
}

$rpcClient = new RpcClient("TestService");
echo $rpcClient->test(['args'=>1]);

测试: 可以看到我们TestService这个业务查询的数据库结果,说明测试完成

php rpc.php


本文》有 0 条评论

留下一个回复