Google 开发的 Protobuf 会经常被拿来和 Thrift 比较,不过 Protobuf 主要提供序列化/反序列化的功能,而 Thrift 还提供了几个 server 模型,加上可以自动生成代码,大大简化了 RPC 的开发。
TSimpleServer
以 上一篇笔记 的 echo server 为例,一个使用 TSimpleServer 的服务端如下:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "EchoServer.h"
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using boost::shared_ptr;
using namespace Test;
class EchoServerHandler : virtual public EchoServerIf {
    public:
        EchoServerHandler()
        {
            // Your initialization goes here
        }
        void echo(std::string& _return, const std::string& msg)
        {
            // Your implementation goes here
            _return = msg;
        }
};
int main(int argc, char **argv)
{
    int port = 9090;
    shared_ptr<EchoServerHandler> handler(new EchoServerHandler());
    shared_ptr<TProcessor> processor(new EchoServerProcessor(handler));
    shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
    TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
    server.serve();
    return 0;
}TSimpleServer 是最简单的 server 模型,就是单线程阻塞,只能串行处理请求。
TThreadPoolServer
为了应对多个请求,Thrift 还提供了多线程的 server 模型:
#include "EchoServer.h"
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/PosixThreadFactory.h>
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;
using boost::shared_ptr;
using namespace Test;
class EchoServerHandler : virtual public EchoServerIf {
    public:
        EchoServerHandler() {
            // Your initialization goes here
        }
        void echo(std::string& _return, const std::string& msg) {
            // Your implementation goes here
            _return = msg;
        }
};
int main(int argc, char **argv)
{
    int port = 9090;
    shared_ptr<EchoServerHandler> handler(new EchoServerHandler());
    shared_ptr<TProcessor> processor(new EchoServerProcessor(handler));
    shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
    shared_ptr<TTransportFactory> transportFactory(new TFramedTransportFactory());
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
    int num_threads = sysconf(_SC_NPROCESSORS_CONF);
    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(num_threads);
    shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory());
    threadManager->threadFactory(threadFactory);
    threadManager->start();
    TThreadPoolServer server(processor, serverTransport, transportFactory, protocolFactory, threadManager);
    server.serve();
    return 0;
}和 TSimpleServer 不同的是,这里多了一个 ThreadManager 对线程进行管理,所有的线程共用一个 handler,要注意再处理代码中加上对临界区的保护。
TNonblockingServer
虽然 TThreadPoolServer 使用了多线程,但是对于 IO 还是阻塞模型,在数据量较大的情况下会有性能瓶颈,因此 Thrift 还提供了另一个多线程的非阻塞模型 TNonblockingServer:
#include "EchoServer.h"
#include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/PosixThreadFactory.h>
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;
using boost::shared_ptr;
using namespace Test;
class EchoServerHandler : virtual public EchoServerIf {
    public:
        EchoServerHandler() {
            // Your initialization goes here
        }
        void echo(std::string& _return, const std::string& msg) {
            // Your implementation goes here
            _return = msg;
        }
};
int main(int argc, char **argv)
{
    int port = 9090;
    shared_ptr<EchoServerHandler> handler(new EchoServerHandler());
    shared_ptr<TProcessor> processor(new EchoServerProcessor(handler));
    shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
    int num_threads = sysconf(_SC_NPROCESSORS_CONF);
    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(num_threads);
    shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory());
    threadManager->threadFactory(threadFactory);
    threadManager->start();
    TNonblockingServer server(processor, protocolFactory, port, threadManager);
    server.serve();
    return 0;
}编译的时候要加上额外的选项 -lthriftnb 和 -levent,从这里可以看出非阻塞的实现使用了 livevent。这里规定了对应的 client 必须使用 TFramedTransport,否则会有以下的报错:
TConnection:workSocket() Negative frame size -2147418111, remote side not using TFramedTransport?