Thrift 学习笔记 (2)

Google 开发的 Protobuf 会经常被拿来和 Thrift 比较,不过 Protobuf 主要提供序列化/反序列化的功能,而 Thrift 还提供了几个 server 模型,加上可以自动生成代码,大大简化了 RPC 的开发。

TSimpleServer

上一篇笔记 的 echo server 为例,一个使用 TSimpleServer 的服务端如下:

// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.

#include "EchoServer.h"
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;

using boost::shared_ptr;

using namespace Test;

class EchoServerHandler : virtual public EchoServerIf {

    public:

        EchoServerHandler()
        {
            // Your initialization goes here
        }

        void echo(std::string& _return, const std::string& msg)
        {
            // Your implementation goes here
            _return = msg;
        }
};

int main(int argc, char **argv)
{
    int port = 9090;

    shared_ptr<EchoServerHandler> handler(new EchoServerHandler());
    shared_ptr<TProcessor> processor(new EchoServerProcessor(handler));
    shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

    TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
    server.serve();

    return 0;
}

TSimpleServer 是最简单的 server 模型,就是单线程阻塞,只能串行处理请求。

TThreadPoolServer

为了应对多个请求,Thrift 还提供了多线程的 server 模型:

#include "EchoServer.h"

#include <thrift/server/TThreadPoolServer.h>

#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/PosixThreadFactory.h>

#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;

using boost::shared_ptr;

using namespace Test;

class EchoServerHandler : virtual public EchoServerIf {
    public:
        EchoServerHandler() {
            // Your initialization goes here
        }

        void echo(std::string& _return, const std::string& msg) {
            // Your implementation goes here
            _return = msg;
        }

};

int main(int argc, char **argv)
{
    int port = 9090;

    shared_ptr<EchoServerHandler> handler(new EchoServerHandler());
    shared_ptr<TProcessor> processor(new EchoServerProcessor(handler));
    shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    shared_ptr<TTransportFactory> transportFactory(new TFramedTransportFactory());
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

    int num_threads = sysconf(_SC_NPROCESSORS_CONF);

    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(num_threads);
    shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory());
    threadManager->threadFactory(threadFactory);
    threadManager->start();

    TThreadPoolServer server(processor, serverTransport, transportFactory, protocolFactory, threadManager);
    server.serve();
    return 0;
}

和 TSimpleServer 不同的是,这里多了一个 ThreadManager 对线程进行管理,所有的线程共用一个 handler,要注意再处理代码中加上对临界区的保护。

TNonblockingServer

虽然 TThreadPoolServer 使用了多线程,但是对于 IO 还是阻塞模型,在数据量较大的情况下会有性能瓶颈,因此 Thrift 还提供了另一个多线程的非阻塞模型 TNonblockingServer:

#include "EchoServer.h"

#include <thrift/server/TNonblockingServer.h>

#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/PosixThreadFactory.h>

#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>

using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;

using boost::shared_ptr;

using namespace Test;

class EchoServerHandler : virtual public EchoServerIf {
    public:
        EchoServerHandler() {
            // Your initialization goes here
        }

        void echo(std::string& _return, const std::string& msg) {
            // Your implementation goes here
            _return = msg;
        }

};

int main(int argc, char **argv)
{
    int port = 9090;

    shared_ptr<EchoServerHandler> handler(new EchoServerHandler());
    shared_ptr<TProcessor> processor(new EchoServerProcessor(handler));
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

    int num_threads = sysconf(_SC_NPROCESSORS_CONF);

    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(num_threads);
    shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory());
    threadManager->threadFactory(threadFactory);
    threadManager->start();

    TNonblockingServer server(processor, protocolFactory, port, threadManager);
    server.serve();

    return 0;
}

编译的时候要加上额外的选项 -lthriftnb 和 -levent,从这里可以看出非阻塞的实现使用了 livevent。这里规定了对应的 client 必须使用 TFramedTransport,否则会有以下的报错:

TConnection:workSocket() Negative frame size -2147418111, remote side not using TFramedTransport?

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注