Google 开发的 Protobuf 会经常被拿来和 Thrift 比较,不过 Protobuf 主要提供序列化/反序列化的功能,而 Thrift 还提供了几个 server 模型,加上可以自动生成代码,大大简化了 RPC 的开发。
TSimpleServer
以 上一篇笔记 的 echo server 为例,一个使用 TSimpleServer 的服务端如下:
// This autogenerated skeleton file illustrates how to build a server.
// You should copy it to another filename to avoid overwriting it.
#include "EchoServer.h"
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using boost::shared_ptr;
using namespace Test;
class EchoServerHandler : virtual public EchoServerIf {
public:
EchoServerHandler()
{
// Your initialization goes here
}
void echo(std::string& _return, const std::string& msg)
{
// Your implementation goes here
_return = msg;
}
};
int main(int argc, char **argv)
{
int port = 9090;
shared_ptr<EchoServerHandler> handler(new EchoServerHandler());
shared_ptr<TProcessor> processor(new EchoServerProcessor(handler));
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
shared_ptr<TTransportFactory> transportFactory(new TBufferedTransportFactory());
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}
TSimpleServer 是最简单的 server 模型,就是单线程阻塞,只能串行处理请求。
TThreadPoolServer
为了应对多个请求,Thrift 还提供了多线程的 server 模型:
#include "EchoServer.h"
#include <thrift/server/TThreadPoolServer.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/PosixThreadFactory.h>
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;
using boost::shared_ptr;
using namespace Test;
class EchoServerHandler : virtual public EchoServerIf {
public:
EchoServerHandler() {
// Your initialization goes here
}
void echo(std::string& _return, const std::string& msg) {
// Your implementation goes here
_return = msg;
}
};
int main(int argc, char **argv)
{
int port = 9090;
shared_ptr<EchoServerHandler> handler(new EchoServerHandler());
shared_ptr<TProcessor> processor(new EchoServerProcessor(handler));
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
shared_ptr<TTransportFactory> transportFactory(new TFramedTransportFactory());
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
int num_threads = sysconf(_SC_NPROCESSORS_CONF);
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(num_threads);
shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory());
threadManager->threadFactory(threadFactory);
threadManager->start();
TThreadPoolServer server(processor, serverTransport, transportFactory, protocolFactory, threadManager);
server.serve();
return 0;
}
和 TSimpleServer 不同的是,这里多了一个 ThreadManager 对线程进行管理,所有的线程共用一个 handler,要注意再处理代码中加上对临界区的保护。
TNonblockingServer
虽然 TThreadPoolServer 使用了多线程,但是对于 IO 还是阻塞模型,在数据量较大的情况下会有性能瓶颈,因此 Thrift 还提供了另一个多线程的非阻塞模型 TNonblockingServer:
#include "EchoServer.h"
#include <thrift/server/TNonblockingServer.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/concurrency/PosixThreadFactory.h>
#include <protocol/TBinaryProtocol.h>
#include <server/TSimpleServer.h>
#include <transport/TServerSocket.h>
#include <transport/TBufferTransports.h>
using namespace ::apache::thrift;
using namespace ::apache::thrift::protocol;
using namespace ::apache::thrift::transport;
using namespace ::apache::thrift::server;
using namespace ::apache::thrift::concurrency;
using boost::shared_ptr;
using namespace Test;
class EchoServerHandler : virtual public EchoServerIf {
public:
EchoServerHandler() {
// Your initialization goes here
}
void echo(std::string& _return, const std::string& msg) {
// Your implementation goes here
_return = msg;
}
};
int main(int argc, char **argv)
{
int port = 9090;
shared_ptr<EchoServerHandler> handler(new EchoServerHandler());
shared_ptr<TProcessor> processor(new EchoServerProcessor(handler));
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
int num_threads = sysconf(_SC_NPROCESSORS_CONF);
shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(num_threads);
shared_ptr<PosixThreadFactory> threadFactory(new PosixThreadFactory());
threadManager->threadFactory(threadFactory);
threadManager->start();
TNonblockingServer server(processor, protocolFactory, port, threadManager);
server.serve();
return 0;
}
编译的时候要加上额外的选项 -lthriftnb 和 -levent,从这里可以看出非阻塞的实现使用了 livevent。这里规定了对应的 client 必须使用 TFramedTransport,否则会有以下的报错:
TConnection:workSocket() Negative frame size -2147418111, remote side not using TFramedTransport?