Netty: Refactoring to make MessageChannelHandler extensible

Small refactorings to make the MessageChannelHandler more extensible.
Also allowed access to the different netty pipelines

This is the fix after the first version had problems with the HTTP
transport due to wrong reusing channel handlers, which is the reason
why tests failed.

Relates #6889
Closes #6915
This commit is contained in:
Alexander Reelsen 2014-07-18 15:38:57 +02:00
parent bbf7e6be92
commit 1816951b6b
5 changed files with 303 additions and 54 deletions

View File

@ -195,7 +195,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
workerCount));
}
serverBootstrap.setPipelineFactory(new MyChannelPipelineFactory(this));
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory());
if (tcpNoDelay != null) {
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
@ -319,13 +319,16 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
}
}
static class MyChannelPipelineFactory implements ChannelPipelineFactory {
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new HttpChannelPipelineFactory(this);
}
private final NettyHttpServerTransport transport;
protected static class HttpChannelPipelineFactory implements ChannelPipelineFactory {
private final HttpRequestHandler requestHandler;
protected final NettyHttpServerTransport transport;
protected final HttpRequestHandler requestHandler;
MyChannelPipelineFactory(NettyHttpServerTransport transport) {
public HttpChannelPipelineFactory(NettyHttpServerTransport transport) {
this.transport = transport;
this.requestHandler = new HttpRequestHandler(transport);
}

View File

@ -45,10 +45,10 @@ import java.net.InetSocketAddress;
*/
public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
private final ESLogger logger;
private final ThreadPool threadPool;
private final TransportServiceAdapter transportServiceAdapter;
private final NettyTransport transport;
protected final ESLogger logger;
protected final ThreadPool threadPool;
protected final TransportServiceAdapter transportServiceAdapter;
protected final NettyTransport transport;
public MessageChannelHandler(NettyTransport transport, ESLogger logger) {
this.threadPool = transport.threadPool();
@ -142,7 +142,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
wrappedStream.close();
}
private void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {
final TransportResponse response = handler.newInstance();
response.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
response.remoteAddress();
@ -200,7 +200,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
}
private String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);

View File

@ -272,27 +272,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
new NioWorkerPool(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")), workerCount),
new HashedWheelTimer(daemonThreadFactory(settings, "transport_client_timer"))));
}
ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (maxCumulationBufferCapacity != null) {
if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
}
}
if (maxCompositeBufferComponents != -1) {
sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
}
pipeline.addLast("size", sizeHeader);
pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
return pipeline;
}
};
clientBootstrap.setPipelineFactory(clientPipelineFactory);
clientBootstrap.setPipelineFactory(configureClientChannelPipelineFactory());
clientBootstrap.setOption("connectTimeoutMillis", connectTimeout.millis());
if (tcpNoDelay != null) {
clientBootstrap.setOption("tcpNoDelay", tcpNoDelay);
@ -328,28 +308,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")),
workerCount));
}
ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("openChannels", openChannels);
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (maxCumulationBufferCapacity != null) {
if (maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
sizeHeader.setMaxCumulationBufferCapacity((int) maxCumulationBufferCapacity.bytes());
}
}
if (maxCompositeBufferComponents != -1) {
sizeHeader.setMaxCumulationBufferComponents(maxCompositeBufferComponents);
}
pipeline.addLast("size", sizeHeader);
pipeline.addLast("dispatcher", new MessageChannelHandler(NettyTransport.this, logger));
return pipeline;
}
};
serverBootstrap.setPipelineFactory(serverPipelineFactory);
serverBootstrap.setPipelineFactory(configureServerChannelPipelineFactory());
if (tcpNoDelay != null) {
serverBootstrap.setOption("child.tcpNoDelay", tcpNoDelay);
}
@ -876,6 +835,70 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return nodeChannels.channel(options.type());
}
public ChannelPipelineFactory configureClientChannelPipelineFactory() {
return new ClientChannelPipelineFactory(this);
}
protected static class ClientChannelPipelineFactory implements ChannelPipelineFactory {
protected NettyTransport nettyTransport;
public ClientChannelPipelineFactory(NettyTransport nettyTransport) {
this.nettyTransport = nettyTransport;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline channelPipeline = Channels.pipeline();
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (nettyTransport.maxCumulationBufferCapacity != null) {
if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
sizeHeader.setMaxCumulationBufferCapacity((int) nettyTransport.maxCumulationBufferCapacity.bytes());
}
}
if (nettyTransport.maxCompositeBufferComponents != -1) {
sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents);
}
channelPipeline.addLast("size", sizeHeader);
channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger));
return channelPipeline;
}
}
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new ServerChannelPipeFactory(this);
}
protected static class ServerChannelPipeFactory implements ChannelPipelineFactory {
protected NettyTransport nettyTransport;
public ServerChannelPipeFactory(NettyTransport nettyTransport) {
this.nettyTransport = nettyTransport;
}
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline channelPipeline = Channels.pipeline();
channelPipeline.addLast("openChannels", nettyTransport.serverOpenChannels);
SizeHeaderFrameDecoder sizeHeader = new SizeHeaderFrameDecoder();
if (nettyTransport.maxCumulationBufferCapacity != null) {
if (nettyTransport.maxCumulationBufferCapacity.bytes() > Integer.MAX_VALUE) {
sizeHeader.setMaxCumulationBufferCapacity(Integer.MAX_VALUE);
} else {
sizeHeader.setMaxCumulationBufferCapacity((int) nettyTransport.maxCumulationBufferCapacity.bytes());
}
}
if (nettyTransport.maxCompositeBufferComponents != -1) {
sizeHeader.setMaxCumulationBufferComponents(nettyTransport.maxCompositeBufferComponents);
}
channelPipeline.addLast("size", sizeHeader);
channelPipeline.addLast("dispatcher", new MessageChannelHandler(nettyTransport, nettyTransport.logger));
return channelPipeline;
}
}
private class ChannelCloseListener implements ChannelFutureListener {
private final DiscoveryNode node;

View File

@ -0,0 +1,159 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.netty.MessageChannelHandler;
import org.elasticsearch.transport.netty.NettyTransport;
import org.elasticsearch.transport.netty.NettyTransportChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
*
*/
public class ConfigurableErrorNettyTransportModule extends AbstractModule {
@Override
protected void configure() {
bind(ExceptionThrowingNettyTransport.class).asEagerSingleton();
bind(Transport.class).to(ExceptionThrowingNettyTransport.class).asEagerSingleton();
}
public static final class ExceptionThrowingNettyTransport extends NettyTransport {
@Inject
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
super(settings, threadPool, networkService, bigArrays, version);
}
@Override
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new ErrorPipelineFactory(this);
}
private static class ErrorPipelineFactory extends ServerChannelPipeFactory {
private final ESLogger logger;
public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport) {
super(exceptionThrowingNettyTransport);
this.logger = exceptionThrowingNettyTransport.logger;
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = super.getPipeline();
pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) {
@Override
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action);
if (handler == null) {
throw new ActionNotFoundTransportException(action);
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
if (request.getHeaders() != null && request.getHeaders().containsKey("ERROR")) {
throw new ElasticsearchException((String) request.getHeaders().get("ERROR"));
}
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.messageReceived(request, transportChannel);
} else {
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
}
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
return action;
}
class RequestHandler extends AbstractRunnable {
private final TransportRequestHandler handler;
private final TransportRequest request;
private final NettyTransportChannel transportChannel;
private final String action;
public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) {
this.handler = handler;
this.request = request;
this.transportChannel = transportChannel;
this.action = action;
}
@SuppressWarnings({"unchecked"})
@Override
public void run() {
try {
handler.messageReceived(request, transportChannel);
} catch (Throwable e) {
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
// we can only send a response transport is started....
try {
transportChannel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
logger.warn("Actual Exception", e);
}
}
}
}
@Override
public boolean isForceExecution() {
return handler.isForceExecution();
}
}
});
return pipeline;
}
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.transport.TransportModule;
import org.junit.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
/**
*
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 1)
public class NettyTransportTests extends ElasticsearchIntegrationTest {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
ImmutableSettings.Builder builder = settingsBuilder()
.put("node.mode", "network")
.put(TransportModule.TRANSPORT_TYPE_KEY, ConfigurableErrorNettyTransportModule.class);
return builder.put(super.nodeSettings(nodeOrdinal)).build();
}
@Test
public void testThatConnectionFailsAsIntended() throws Exception {
Client transportClient = internalCluster().transportClient();
ClusterHealthResponse clusterIndexHealths = transportClient.admin().cluster().prepareHealth().get();
assertThat(clusterIndexHealths.getStatus(), is(ClusterHealthStatus.GREEN));
try {
transportClient.admin().cluster().prepareHealth().putHeader("ERROR", "MY MESSAGE").get();
fail("Expected exception, but didnt happen");
} catch (ElasticsearchException e) {
assertThat(e.getMessage(), containsString("MY MESSAGE"));
}
}
}