Netty: Refactoring to make MessageChannelHandler extensible

Small refactorings to make the MessageChannelHandler more extensible.
Also allowed access to the different netty pipelines

Closes #6889
This commit is contained in:
Alexander Reelsen 2014-07-17 08:28:55 +02:00
parent 6fe1d9860e
commit db7f0d36af
5 changed files with 287 additions and 101 deletions

View File

@ -195,8 +195,9 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
workerCount));
}
serverBootstrap.setPipelineFactory(new MyChannelPipelineFactory(this));
ChannelPipeline serverChannelPipeline = Channels.pipeline();
configureServerChannelPipeline(serverChannelPipeline);
serverBootstrap.setPipeline(serverChannelPipeline);
if (tcpNoDelay != null) {
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
}
@ -315,51 +316,36 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
}
}
static class MyChannelPipelineFactory implements ChannelPipelineFactory {
private final NettyHttpServerTransport transport;
private final HttpRequestHandler requestHandler;
MyChannelPipelineFactory(NettyHttpServerTransport transport) {
this.transport = transport;
this.requestHandler = new HttpRequestHandler(transport);
public void configureServerChannelPipeline(ChannelPipeline channelPipeline) {
channelPipeline.addLast("openChannels", this.serverOpenChannels);
HttpRequestDecoder requestDecoder = new HttpRequestDecoder(
(int) this.maxInitialLineLength.bytes(),
(int) this.maxHeaderSize.bytes(),
(int) this.maxChunkSize.bytes()
);
if (this.maxCumulationBufferCapacity != null) {
if (this.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
requestDecoder.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
requestDecoder.setMaxCumulationBufferCapacity((int) this.maxCumulationBufferCapacity.bytes());
}
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("openChannels", transport.serverOpenChannels);
HttpRequestDecoder requestDecoder = new HttpRequestDecoder(
(int) transport.maxInitialLineLength.bytes(),
(int) transport.maxHeaderSize.bytes(),
(int) transport.maxChunkSize.bytes()
);
if (transport.maxCumulationBufferCapacity != null) {
if (transport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
requestDecoder.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
requestDecoder.setMaxCumulationBufferCapacity((int) transport.maxCumulationBufferCapacity.bytes());
}
}
if (transport.maxCompositeBufferComponents != -1) {
requestDecoder.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
}
pipeline.addLast("decoder", requestDecoder);
if (transport.compression) {
pipeline.addLast("decoder_compress", new HttpContentDecompressor());
}
HttpChunkAggregator httpChunkAggregator = new HttpChunkAggregator((int) transport.maxContentLength.bytes());
if (transport.maxCompositeBufferComponents != -1) {
httpChunkAggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
}
pipeline.addLast("aggregator", httpChunkAggregator);
pipeline.addLast("encoder", new HttpResponseEncoder());
if (transport.compression) {
pipeline.addLast("encoder_compress", new HttpContentCompressor(transport.compressionLevel));
}
pipeline.addLast("handler", requestHandler);
return pipeline;
if (this.maxCompositeBufferComponents != -1) {
requestDecoder.setMaxCumulationBufferComponents(this.maxCompositeBufferComponents);
}
channelPipeline.addLast("decoder", requestDecoder);
if (this.compression) {
channelPipeline.addLast("decoder_compress", new HttpContentDecompressor());
}
HttpChunkAggregator httpChunkAggregator = new HttpChunkAggregator((int) this.maxContentLength.bytes());
if (this.maxCompositeBufferComponents != -1) {
httpChunkAggregator.setMaxCumulationBufferComponents(this.maxCompositeBufferComponents);
}
channelPipeline.addLast("aggregator", httpChunkAggregator);
channelPipeline.addLast("encoder", new HttpResponseEncoder());
if (this.compression) {
channelPipeline.addLast("encoder_compress", new HttpContentCompressor(this.compressionLevel));
}
channelPipeline.addLast("handler", new HttpRequestHandler(this));
}
}

View File

@ -45,10 +45,10 @@ import java.net.InetSocketAddress;
*/
public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
private final ESLogger logger;
private final ThreadPool threadPool;
private final TransportServiceAdapter transportServiceAdapter;
private final NettyTransport transport;
protected final ESLogger logger;
protected final ThreadPool threadPool;
protected final TransportServiceAdapter transportServiceAdapter;
protected final NettyTransport transport;
public MessageChannelHandler(NettyTransport transport, ESLogger logger) {
this.threadPool = transport.threadPool();
@ -142,7 +142,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
wrappedStream.close();
}
private void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
response.remoteAddress();
@ -200,7 +200,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
}
private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);

View File

@ -72,10 +72,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
@ -272,27 +269,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount),
new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
}
ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (maxCumulationBufferCapacity != null) {
if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
}
}
if (maxCompositeBufferComponents != -1) {
sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
}
pipeline.addLast("size", sizeHeader);
pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
return pipeline;
}
};
clientBootstrap.setPipelineFactory(clientPipelineFactory);
ChannelPipeline clientChannelPipeline = Channels.pipeline();
configureClientChannelPipeline(clientChannelPipeline);
clientBootstrap.setPipeline(clientChannelPipeline);
clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis());
if (tcpNoDelay != null) {
clientBootstrap.setOption("tcpNoDelay", tcpNoDelay);
@ -315,8 +294,6 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return;
}
final OpenChannelsHandler openChannels = new OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels;
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")),
@ -328,28 +305,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")),
workerCount));
}
ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("openChannels", openChannels);
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (maxCumulationBufferCapacity != null) {
if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
}
}
if (maxCompositeBufferComponents != -1) {
sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
}
pipeline.addLast("size", sizeHeader);
pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
return pipeline;
}
};
serverBootstrap.setPipelineFactory(serverPipelineFactory);
ChannelPipeline serverChannelPipeline = Channels.pipeline();
configureServerChannelPipeline(serverChannelPipeline);
serverBootstrap.setPipeline(serverChannelPipeline);
if (tcpNoDelay != null) {
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
}
@ -412,6 +371,41 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.boundAddress = new BoundTransportAddress(new InetSocketTransportAddress(boundAddress), new InetSocketTransportAddress(publishAddress));
}
public void configureClientChannelPipeline(ChannelPipeline channelPipeline) {
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (maxCumulationBufferCapacity != null) {
if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
}
}
if (maxCompositeBufferComponents != -1) {
sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
}
channelPipeline.addLast("size", sizeHeader);
channelPipeline.addLast("dispatcher", new MessageChannelHandler(this, logger));
}
public void configureServerChannelPipeline(ChannelPipeline channelPipeline) {
final OpenChannelsHandler openChannels = new OpenChannelsHandler(logger);
this.serverOpenChannels = openChannels;
channelPipeline.addLast("openChannels", openChannels);
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (maxCumulationBufferCapacity != null) {
if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
}
}
if (maxCompositeBufferComponents != -1) {
sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
}
channelPipeline.addLast("size", sizeHeader);
channelPipeline.addLast("dispatcher", new MessageChannelHandler(this, logger));
}
@Override
protected void doStop() throws ElasticsearchException {
final CountDownLatch latch = new CountDownLatch(1);

View File

@ -0,0 +1,142 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.netty.MessageChannelHandler;
import org.elasticsearch.transport.netty.NettyTransport;
import org.elasticsearch.transport.netty.NettyTransportChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
*
*/
public class ConfigurableErrorNettyTransportModule extends AbstractModule {
@Override
protected void configure() {
bind(ExceptionThrowingNettyTransport.class).asEagerSingleton();
bind(Transport.class).to(ExceptionThrowingNettyTransport.class).asEagerSingleton();
}
public static final class ExceptionThrowingNettyTransport extends NettyTransport {
@Inject
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
super(settings, threadPool, networkService, bigArrays, version);
}
@Override
public void configureServerChannelPipeline(ChannelPipeline channelPipeline) {
super.configureServerChannelPipeline(channelPipeline);
channelPipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(this, logger) {
@Override
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(ExceptionThrowingNettyTransport.this, action, channel, requestId, version);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {
throw new ActionNotFoundTransportException(action);
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
if (request.getHeaders() != null && request.getHeaders().containsKey("ERROR")) {
throw new ElasticsearchException((String)request.getHeaders().get("ERROR"));
}
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.messageReceived(request, transportChannel);
} else {
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
}
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
return action;
}
class RequestHandler extends AbstractRunnable {
private final TransportRequestHandler handler;
private final TransportRequest request;
private final NettyTransportChannel transportChannel;
private final String action;
public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) {
this.handler = handler;
this.request = request;
this.transportChannel = transportChannel;
this.action = action;
}
@SuppressWarnings({"unchecked"})
@Override
public void run() {
try {
handler.messageReceived(request, transportChannel);
} catch (Throwable e) {
if (ExceptionThrowingNettyTransport.this.lifecycleState() == Lifecycle.State.STARTED) {
// we can only send a response transport is started....
try {
transportChannel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
logger.warn("Actual Exception", e);
}
}
}
}
@Override
public boolean isForceExecution() {
return handler.isForceExecution();
}
}
});
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.transport.TransportModule;
import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
/**
*
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 1)
public class NettyTransportTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
ImmutableSettings.Builder builder = settingsBuilder()
.put("node.mode", "network")
.put(TransportModule.TRANSPORT_TYPE_KEY, ConfigurableErrorNettyTransportModule.class);
return builder.put(super.nodeSettings(nodeOrdinal)).build();
}
@Test
public void testThatConnectionFailsAsIntended() throws Exception {
Client transportClient = internalCluster().transportClient();
ClusterHealthResponse clusterIndexHealths = transportClient.admin().cluster().prepareHealth().get();
assertThat(clusterIndexHealths.getStatus(), is(ClusterHealthStatus.GREEN));
try {
transportClient.admin().cluster().prepareHealth().putHeader("ERROR", "MY MESSAGE").get();
fail("Expected exception, but didnt happen");
} catch (ElasticsearchException e) {
assertThat(e.getMessage(), containsString("MY MESSAGE"));
}
}
}