经验分享 EOS 中 RPC 的实现过程

yiyanwannian · September 28, 2019 · 329 hits

EOS RPC的实现过程

EOS的RPC主要在http_plugin里面, 虽然在chain_api_plugin, db_size_api_plugin, history_api_plugin 等等都有RPC接口,但其实还是引用的http_plugin模块实现的。本篇文章将对http_plugin中的实现过程一步步讲述。

RPC的启动是在http_plugin::plugin_startup()中,做了Http/Https/Unix的支持,我将主要结构代码简化出来:

void http_plugin::plugin_startup()
{
    my->thread_pool.emplace("http", my->thread_pool_size);
    if (my->listen_endpoint) { ... } //处理http 请求
    if (my->unix_endpoint) { ... } //处理unix 请求
    if (my->https_listen_endpoint) { ... } //处理https 请求
    //添加一个显示目前支持的所有RPC的rpc接口
    add_api({{std::string("/v1/node/get_supported_apis"), 
              [&](string, string body, url_response_callback cb) .... }});
}

跟进去看,发现他们最终都是调用的 websocketpp::lib::asio 来启动端口服务监听

my->server.listen(*my->listen_endpoint);  //http
my->unix_server.listen(*my->unix_endpoint); //unix
my->https_server.listen(*my->https_listen_endpoint); //https

而且都还有一个set_http_handler, 最重要的是,里面都包含一个handle_http_request:

//http/https
 ws.set_http_handler([&](connection_hdl hdl) {
                handle_http_request<detail::asio_with_stub_log<T>>(ws.get_con_from_hdl(hdl));
            });

//unix
my->unix_server.set_http_handler([&, &ioc = my->thread_pool->get_executor()](connection_hdl hdl) {
                my->handle_http_request<detail::asio_local_with_stub_log>(my->unix_server.get_con_from_hdl(hdl));
            });

我们进入再handle_http_request中看看:

template <class T>
void handle_http_request(typename websocketpp::server<T>::connection_ptr con)
{
    try
    {
        auto &req = con->get_request();

        if (!allow_host<T>(req, con))
            return;

        //对http header进行处理
        if (!access_control_allow_origin.empty()) { ... }
        if (!access_control_allow_headers.empty()) { ... }
        if (!access_control_max_age.empty()) { ... }
        if (access_control_allow_credentials) { ... }
        if (req.get_method() == "OPTIONS") { ... }
        con->append_header("Content-type", "application/json");

        if (bytes_in_flight > max_bytes_in_flight)
        {
            //处理传入的body过大
            dlog("503 - too many bytes in flight: ${bytes}", ("bytes", bytes_in_flight.load()));
            ....
        }

        std::string body = con->get_request_body();
        std::string resource = con->get_uri()->get_resource();
        auto handler_itr = url_handlers.find(resource); //注意这里的url_handlers
        if (handler_itr != url_handlers.end())
        {
            //这里是核心
        }
        else
        {
            //这里是核心
        }
    }
    catch (...)
    {
        //处理错误
    }
}

从上面可以看到handle_http_request其实就是RPC的实际处理过程,我对代码做了简化,当然核心部分我不能简化,我在里面标注了出来。后面将讲述核心部分,但是我们在这之前,得看看里面的url_handlers

//url_handlers
map<string, url_handler> url_handlers;
//url_handler
using url_handler = std::function<void(string, string, url_response_callback)>;
//url_response_callback
using url_response_callback = std::function<void(int, fc::variant)>;

url_handlers 其实是重中之重,它是一个map, key是string,存储的就是URL路径, values是url_handler,存储的就是我们具体要执行的rpc的方法的函数指针,url_response_callback也是一个void返回的函数,其实就是我们后面会看到的cb(500, fc::variant(results))这样的方法,将httpcode和results设定到http response中. 还有,怎么向url_handlers添加的rpc呢?其实我们最开始已经看到了:

//就在最开始的时候
add_api({{std::string("/v1/node/get_supported_apis"), 
      [&](string, string body, url_response_callback cb) .... }});

//看看add_api的实现
void add_api(const api_description &api)
{
    for (const auto &call : api)
        add_handler(call.first, call.second);
}

//再看看add_handler的实现
void http_plugin::add_handler(const string &url, const url_handler &handler)
{
    ilog("add api url: ${c}", ("c", url));
    my->url_handlers.insert(std::make_pair(url, handler)); //就是在这里添加进来的
}

在EOS中我们还可以看到大量通过add_api(...)添加进来的RPC,如history_apiplugin.cpp中:

app().get_plugin<http_plugin>().add_api({
   CHAIN_RO_CALL(get_actions), //这里有很多CHAIN_RO_CALL宏定义我们后面展开讲
   CHAIN_RO_CALL(get_transaction),
   CHAIN_RO_CALL(get_key_accounts),
   CHAIN_RO_CALL(get_controlled_accounts)
});

接下来我门接着看刚刚说的核心部分,先看if (handler_itr != url_handlers.end())部分:

//获取request body
std::string body = con->get_request_body();  
//获取URL路径,即url_handlers的key
std::string resource = con->get_uri()->get_resource();
//遍历url_handlers找到对应的url_handlers
auto handler_itr = url_handlers.find(resource);
//如果找到,进行处理
if (handler_itr != url_handlers.end())
{
    con->defer_http_response();
    bytes_in_flight += body.size();
    app().post(appbase::priority::low,
        [&ioc = thread_pool->get_executor(), &bytes_in_flight = this->bytes_in_flight, handler_itr,
        resource{std::move(resource)}, body{std::move(body)}, con]() {
            try
            {
                //handler_itr->second(...)就是执行具体的RPC方法,如get_transaction
                handler_itr->second(resource, body,
                    [&ioc, &bytes_in_flight, con](int code, fc::variant response_body) {
                        //调用的是boost::asio::post(*io_serv, pri_queue.wrap(priority, std::forward<Func>(func)))
                        boost::asio::post(ioc, [response_body{std::move(response_body)}, &bytes_in_flight, con, code]() mutable {
                                //这个方法就是实现url_response_callback,即cb(500, fc::variant(results))的具体实现
                                //这里组装response body
                                std::string json = fc::json::to_string(response_body);
                                response_body.clear();
                                const size_t json_size = json.size();
                                bytes_in_flight += json_size;
                                con->set_body(std::move(json));
                                con->set_status(websocketpp::http::status_code::value(code));
                                con->send_http_response();
                                bytes_in_flight -= json_size;
                            });
                    });
                bytes_in_flight -= body.size();
            }
            catch (...)
            {
                handle_exception<T>(con);
                con->send_http_response();
            }
                });
}
else { ... }

再来看else部分,就是处理请求的URL路径找不到的情况:

dlog("404 - not found: ${ep}", ("ep", resource));
    error_results results{websocketpp::http::status_code::not_found,
                            "Not Found", error_results::error_info(fc::exception(FC_LOG_MESSAGE(error, "Unknown Endpoint")), verbose_http_errors)};
    con->set_body(fc::json::to_string(results));
    con->set_status(websocketpp::http::status_code::not_found);

讲到这里成个RPC实现流程就完成了,还有一个部分没有讲解,就是关于EOS 中 add_api(...)大量使用的时候是怎么做的,其实就是EOS是如何添加RPC接口到nodeos中的,后面我会再写一篇文章讲解。

No Reply at the moment.
需要 Sign In 后方可回复, 如果你还没有账号请点击这里 Sign Up