接上一篇文章《facebook/swift:构建thrift http server(1)》
Netty http server
在为facelog选择XHR实现方案时,我反复看过facebook/swift,了解到它依赖的底层通讯框架是netty,说实话,我之前对netty并不太了解,藉于这次任务需要,我才花时间进一步了解了一下netty是什么。
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
@百度百科
我好像明白facebook/swift为什么要基于netty设计了,netty本就是一个高效的异步通讯框架,你可以利用它实现你想要的任何应用协议(当然也包括thrift的通讯协议)。
更进一步了解,才知道netty就已经实现了HTTP协议的编解码:
参见 org.jboss.netty.handler.codec.http.HttpServerCodec类(facebook/swift是基于netty 3.7,所以这里的链接是3.7版本的源码)
而thrift本身是支持JSON格式的数据协议的,参见org.apache.thrift.protocol.TJSONProtocol,对于浏览器前端,JSON协议至关重要。
于是一个基本的想法在我的脑中浮现:
利用netty内置的HTTP协议编解码器(HttpServerCodec )来响应浏览器的XMLHttpRequest (XHR)请求,再将解码出JSON数据通过TJSONProtocol 反序列化成java数据对象就可以传递给thrift服务了,反方向HTTP Respone的也是差不多的反向流程。
哈,似乎很简单,调整一下Netty的协议栈就好了,不需要写很多代码。
简单说,就是实现一个基于netty的thrift http server,而这其中关键的几个部件都是现成的,似乎不需要重新开发新部件。
基于我过往的开发经验,虽然我知道这样想很天真,但这个方案对于我来说,就是完全满足我的要求的方案:不需要新的依赖库,不需要servlet容器。
我之前对netty并不太了解,小白一个,在这个方案中,我最大的成本是熟悉netty框架,熟悉它才知道怎么修改协议栈。(如果你也不了解netty建议你在网上找找这方面的文章,很多,本文不负责netty的入门)
推荐文章: 《Netty源码解读(三)Channel与Pipeline》
NOTE:
阅读下面的技术分析时,建议你最好用你熟悉的IDE(eclipse,IDEA)打开这几个相关的开源框架的源码以便对照查看源码 facebookarchive/nifty, facebookarchive/swift, apache/thrift(java)
NettyServerTransport
下面是com.facebook.nifty.core.NettyServerTransport的构造方法及服务启动方法(start),可以理解为facebook/swift用于启动thrift Server的前的准备工作
public NettyServerTransport(
final ThriftServerDef def,
final NettyServerConfig nettyServerConfig,
final ChannelGroup allChannels)
{
this.def = def;
this.nettyServerConfig = nettyServerConfig;
this.port = def.getServerPort();
this.allChannels = allChannels;
final ConnectionLimiter connectionLimiter = new ConnectionLimiter(def.getMaxConnections());
this.channelStatistics = new ChannelStatistics(allChannels);
this.pipelineFactory = new ChannelPipelineFactory()
{
@Override
public ChannelPipeline getPipeline()
throws Exception
{
ChannelPipeline cp = Channels.pipeline();
TProtocolFactory inputProtocolFactory = def.getDuplexProtocolFactory().getInputProtocolFactory();
NiftySecurityHandlers securityHandlers = def.getSecurityFactory().getSecurityHandlers(def, nettyServerConfig);
cp.addLast("connectionContext", new ConnectionContextHandler());
cp.addLast("connectionLimiter", connectionLimiter);
cp.addLast(ChannelStatistics.NAME, channelStatistics);
cp.addLast("encryptionHandler", securityHandlers.getEncryptionHandler());
cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(),
inputProtocolFactory));
if (def.getClientIdleTimeout() != null) {
cp.addLast("idleTimeoutHandler", new IdleStateHandler(nettyServerConfig.getTimer(),
def.getClientIdleTimeout().toMillis(),
NO_WRITER_IDLE_TIMEOUT,
NO_ALL_IDLE_TIMEOUT,
TimeUnit.MILLISECONDS));
cp.addLast("idleDisconnectHandler", new IdleDisconnectHandler());
}
cp.addLast("authHandler", securityHandlers.getAuthenticationHandler());
cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer()));
cp.addLast("exceptionLogger", new NiftyExceptionLogger());
return cp;
}
};
}
public void start(ServerChannelFactory serverChannelFactory)
{
if (!(InternalLoggerFactory.getDefaultFactory() instanceof Slf4JLoggerFactory)) {
log.warn("Nifty always logs to slf4j, but netty is currently configured to use a " +
"different logging implementation. To correct this call " +
"InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()) " +
"during your server's startup");
}
bootstrap = new ServerBootstrap(serverChannelFactory);
bootstrap.setOptions(nettyServerConfig.getBootstrapOptions());
bootstrap.setPipelineFactory(pipelineFactory);
serverChannel = bootstrap.bind(new InetSocketAddress(port));
SocketAddress actualSocket = serverChannel.getLocalAddress();
if (actualSocket instanceof InetSocketAddress) {
int actualPort = ((InetSocketAddress) actualSocket).getPort();
log.info("started transport {}:{} (:{})", def.getName(), actualPort, port);
}
else {
log.info("started transport {}:{}", def.getName(), port);
}
}
从上面的代码中可以看到,在启动thrift server前,向协议管道(ChannelPipeline)添加了很多个ChannelHandler .这其中我们最关注的只有两个分别名为frameCodec 和dispatcher 的ChannelHandler
即下面这两行
cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(),inputProtocolFactory));
cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer()));
frameCodec 是双向编码器,负责将client端通讯数据的帧数据编解码,而dispatcher 则是将frameCodec 解码的数据传递给封装为ThriftServiceProcessor 的thrift服务实例。只要dispatcher 能正确收到client的请求数据,就成功了一半了。
FrameDecoder
FrameDecoder是netty定义的一个帧数据解码器抽象类。主要的作用就是负责从数据通道(ChannelBuffer,such as TCP/IP)接收数据并解码成指定的格式。具体是什么格式,这要看子类如何实现抽象方法decode了。
DefaultThriftFrameCodec
def.getThriftFrameCodecFactory() 默认提供的是实现二进制传输的com.facebook.nifty.codec.DefaultThriftFrameCodec帧编解码器实例,负责最外层的数据编码解码工作(为什么默认是DefaultThriftFrameCodec ,说来话长,不如你自己看源码)
DefaultThriftFrameCodec 通过DefaultThriftFrameDecoder 实现帧数据解码,用DefaultThriftFrameEncoder 实现数据编码, DefaultThriftFrameDecoder就是FrameDecoder 的一个子类
package com.facebook.nifty.codec;
import org.apache.thrift.protocol.TProtocolFactory;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
public class DefaultThriftFrameCodec implements ThriftFrameCodec
{
private final ThriftFrameDecoder decoder;
private final ThriftFrameEncoder encoder;
public DefaultThriftFrameCodec(int maxFrameSize, TProtocolFactory inputProtocolFactory)
{
this.decoder = new DefaultThriftFrameDecoder(maxFrameSize, inputProtocolFactory);
this.encoder = new DefaultThriftFrameEncoder(maxFrameSize);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception
{
encoder.handleDownstream(ctx, e);
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception
{
decoder.handleUpstream(ctx, e);
}
}
HttpServerCodec
根据之前看Netty相关的教程,我知道netty已经实现了HTTP的编解码,前面说过了,就是org.jboss.netty.handler.codec.http.HttpServerCodec
HttpServerCodec 通过HttpRequestDecoder 实例HTTP请求的解码,通过HttpResponseEncoder 实现将返回数据编码为HTTP Response
package org.jboss.netty.handler.codec.http;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelUpstreamHandler;
public class HttpServerCodec implements ChannelUpstreamHandler,
ChannelDownstreamHandler {
private final HttpRequestDecoder decoder;
private final HttpResponseEncoder encoder = new HttpResponseEncoder();
public HttpServerCodec() {
this(4096, 8192, 8192);
}
public HttpServerCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
decoder = new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize);
}
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
decoder.handleUpstream(ctx, e);
}
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
encoder.handleDownstream(ctx, e);
}
}
小结
通过上面的分析,可以得出结论:如果要thrift 服务实现对HTTP请求的响应,显然不能使用实现thrift二进制协议的DefaultThriftFrameCodec ,而应该实现HTTP协议的编解码。
而netty有提供HTTP协议的编解码器HttpServerCodec ,看来只要把NettyServerTransport 中默认的帧编解码器(frameCodec )从DefaultThriftFrameCodec 换成HttpServerCodec 就可以响应浏览器的XHR请求了。
HttpServerCodec替换DefaultThriftFrameCodec
facebook/swift/swift-service的com.facebook.swift.service.ThriftServer类的作用是将thrift服务实例(封装为NiftyProcessor 接口实例,thrift实现类为ThriftServiceProcessor )进一步封装为一个可以启动的Netty server。
如果希望在ThriftServer 启动时用HttpServerCodec 替换DefaultThriftFrameCodec 需要做如下的工作,
首先要创建一个类ThriftHttpCodec ,将HttpServerCodec 封装为ThriftFrameCodec ,代码如下:
public class ThriftHttpCodec extends HttpServerCodec implements ThriftFrameCodec {
}
再创建一个类ThriftHttpCodecFactory 实现ThriftFrameCodecFactory 接口
public class ThriftHttpCodecFactory implements ThriftFrameCodecFactory {
@Override
public ChannelHandler create(int maxFrameSize, TProtocolFactory defaultProtocolFactory) {
return new ThriftHttpCodec();
}
}
有了ThriftHttpCodec 和ThriftHttpCodecFactory 类就可以在ThriftServer 启动时用ThriftHttpCodec 替换DefaultThriftFrameCodec 。
示例如下:
ImmutableMap<String,TDuplexProtocolFactory> DEFAULT_PROTOCOL_FACTORIES =
ImmutableMap.<String, TDuplexProtocolFactory>builder()
.putAll(ThriftServer.DEFAULT_PROTOCOL_FACTORIES)
.put("json", TDuplexProtocolFactory.fromSingleFactory(new TJSONProtocol.Factory()))
.build();
ImmutableMap<String,ThriftFrameCodecFactory> DEFAULT_FRAME_CODEC_FACTORIES =
ImmutableMap.<String, ThriftFrameCodecFactory>builder()
.putAll(ThriftServer.DEFAULT_FRAME_CODEC_FACTORIES)
.put("http", (ThriftFrameCodecFactory) new ThriftHttpCodecFactory())
.build();
ThriftServerConfig thriftServerConfig = new ThriftServerConfig()
.setPort(26412)
.setTransportName("http")
.setProtocolName("json");
this.processor = new ThriftServiceProcessorCustom(
new ThriftCodecManager(),
checkNotNull(eventHandlers,"eventHandlers is null"),
services);
this.thriftServer = new ThriftServer(processor,
thriftServerConfig,
new NiftyTimer("thrift"),
DEFAULT_FRAME_CODEC_FACTORIES, DEFAULT_PROTOCOL_FACTORIES,
ThriftServer.DEFAULT_WORKER_EXECUTORS,
ThriftServer.DEFAULT_SECURITY_FACTORY);
this.thriftServer.start();
上面这样处理后,ThriftServer 已经是一个支持HTTP响应的thrift http server了。
但这样就结束了么?启动的ThriftServer 就可以正常响应http request了么?NO,NO后面还有坑。。。
写了好长,休息一下,下节再说。
《facebook/swift:构建thrift http server(3)–CORS跨域》 《facebook/swift:构建thrift http server(4)–ThriftXHRDecoder,ThriftXHREncoder》
|
请发表评论