Transport: Refactor guice startup

* Removed & refactored unused module code
* Allowed to set transports programmatically
* Allow to set the source of the changed transport

Note: The current implementation breaks BWC as you need to specify a concrete
transport now instead of a module if you want to use a different
Transport or HttpServerTransport

Closes #7289
This commit is contained in:
Alexander Reelsen 2014-08-15 09:42:27 +02:00
parent 7aa2d11cdd
commit 247ff7d801
11 changed files with 305 additions and 360 deletions

View File

@ -19,33 +19,50 @@
package org.elasticsearch.http;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.http.netty.NettyHttpServerTransportModule;
import org.elasticsearch.http.netty.NettyHttpServerTransport;
import org.elasticsearch.plugins.Plugin;
import static org.elasticsearch.common.Preconditions.checkNotNull;
/**
*
*/
public class HttpServerModule extends AbstractModule implements SpawnModules {
public class HttpServerModule extends AbstractModule {
private final Settings settings;
private final ESLogger logger;
private Class<? extends HttpServerTransport> configuredHttpServerTransport;
private String configuredHttpServerTransportSource;
public HttpServerModule(Settings settings) {
this.settings = settings;
}
@Override
public Iterable<? extends Module> spawnModules() {
return ImmutableList.of(Modules.createModule(settings.getAsClass("http.type", NettyHttpServerTransportModule.class, "org.elasticsearch.http.", "HttpServerTransportModule"), settings));
this.logger = Loggers.getLogger(getClass(), settings);
}
@SuppressWarnings({"unchecked"})
@Override
protected void configure() {
if (configuredHttpServerTransport != null) {
logger.info("Using [{}] as http transport, overridden by [{}]", configuredHttpServerTransport.getName(), configuredHttpServerTransportSource);
bind(HttpServerTransport.class).to(configuredHttpServerTransport).asEagerSingleton();
} else {
Class<? extends HttpServerTransport> defaultHttpServerTransport = NettyHttpServerTransport.class;
Class<? extends HttpServerTransport> httpServerTransport = settings.getAsClass("http.type", defaultHttpServerTransport, "org.elasticsearch.http.", "HttpServerTransport");
bind(HttpServerTransport.class).to(httpServerTransport).asEagerSingleton();
}
bind(HttpServer.class).asEagerSingleton();
}
public void setHttpServerTransport(Class<? extends HttpServerTransport> httpServerTransport, String source) {
checkNotNull(httpServerTransport, "Configured http server transport may not be null");
checkNotNull(source, "Plugin, that changes transport may not be null");
this.configuredHttpServerTransport = httpServerTransport;
this.configuredHttpServerTransportSource = source;
}
}

View File

@ -1,34 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.http.HttpServerTransport;
/**
*
*/
public class NettyHttpServerTransportModule extends AbstractModule {
@Override
protected void configure() {
bind(HttpServerTransport.class).to(NettyHttpServerTransport.class).asEagerSingleton();
}
}

View File

@ -19,48 +19,74 @@
package org.elasticsearch.transport;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Module;
import org.elasticsearch.common.inject.Modules;
import org.elasticsearch.common.inject.SpawnModules;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.local.LocalTransportModule;
import org.elasticsearch.transport.netty.NettyTransportModule;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.transport.local.LocalTransport;
import org.elasticsearch.transport.netty.NettyTransport;
import static org.elasticsearch.common.Preconditions.checkNotNull;
/**
*
*/
public class TransportModule extends AbstractModule implements SpawnModules {
public class TransportModule extends AbstractModule {
private final Settings settings;
public static final String TRANSPORT_TYPE_KEY = "transport.type";
public static final String TRANSPORT_SERVICE_TYPE_KEY = "transport.service.type";
private final ESLogger logger;
private final Settings settings;
private Class<? extends TransportService> configuredTransportService;
private Class<? extends Transport> configuredTransport;
private String configuredTransportServiceSource;
private String configuredTransportSource;
public TransportModule(Settings settings) {
this.settings = settings;
}
@Override
public Iterable<? extends Module> spawnModules() {
Class<? extends Module> defaultTransportModule;
if (DiscoveryNode.localNode(settings)) {
defaultTransportModule = LocalTransportModule.class;
} else {
defaultTransportModule = NettyTransportModule.class;
}
return ImmutableList.of(Modules.createModule(settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"), settings));
this.logger = Loggers.getLogger(getClass(), settings);
}
@Override
protected void configure() {
Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, TransportService.class, "org.elasticsearch.transport.", "TransportService");
if (!TransportService.class.equals(transportService)) {
bind(TransportService.class).to(transportService).asEagerSingleton();
if (configuredTransportService != null) {
logger.info("Using [{}] as transport service, overridden by [{}]", configuredTransportService.getName(), configuredTransportServiceSource);
bind(TransportService.class).to(configuredTransportService).asEagerSingleton();
} else {
bind(TransportService.class).asEagerSingleton();
Class<? extends TransportService> defaultTransportService = TransportService.class;
Class<? extends TransportService> transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, defaultTransportService, "org.elasticsearch.transport.", "TransportService");
if (!TransportService.class.equals(transportService)) {
bind(TransportService.class).to(transportService).asEagerSingleton();
} else {
bind(TransportService.class).asEagerSingleton();
}
}
if (configuredTransport != null) {
logger.info("Using [{}] as transport, overridden by [{}]", configuredTransport.getName(), configuredTransportSource);
bind(Transport.class).to(configuredTransport).asEagerSingleton();
} else {
Class<? extends Transport> defaultTransport = DiscoveryNode.localNode(settings) ? LocalTransport.class : NettyTransport.class;
Class<? extends Transport> transport = settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransport, "org.elasticsearch.transport.", "Transport");
bind(Transport.class).to(transport).asEagerSingleton();
}
}
public void setTransportService(Class<? extends TransportService> transportService, String source) {
checkNotNull(transportService, "Configured transport service may not be null");
checkNotNull(source, "Plugin, that changes transport service may not be null");
this.configuredTransportService = transportService;
this.configuredTransportServiceSource = source;
}
public void setTransport(Class<? extends Transport> transport, String source) {
checkNotNull(transport, "Configured transport may not be null");
checkNotNull(source, "Plugin, that changes transport may not be null");
this.configuredTransport = transport;
this.configuredTransportSource = source;
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.local;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.Transport;
/**
*
*/
public class LocalTransportModule extends AbstractModule {
private final Settings settings;
public LocalTransportModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(LocalTransport.class).asEagerSingleton();
bind(Transport.class).to(LocalTransport.class).asEagerSingleton();
}
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.Transport;
/**
*
*/
public class NettyTransportModule extends AbstractModule {
private final Settings settings;
public NettyTransportModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(NettyTransport.class).asEagerSingleton();
bind(Transport.class).to(NettyTransport.class).asEagerSingleton();
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugins;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import static org.hamcrest.Matchers.*;
/**
*
*/
@ClusterScope(scope = Scope.SUITE, numDataNodes = 2)
public class PluggableTransportModuleTests extends ElasticsearchIntegrationTest {
public static final AtomicInteger SENT_REQUEST_COUNTER = new AtomicInteger(0);
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return settingsBuilder()
.put("plugin.types", CountingSentRequestsPlugin.class.getName())
.put(super.nodeSettings(nodeOrdinal))
.build();
}
@Test
public void testThatPluginFunctionalityIsLoadedWithoutConfiguration() throws Exception {
for (Transport transport : internalCluster().getInstances(Transport.class)) {
assertThat(transport, instanceOf(CountingAssertingLocalTransport.class));
}
// the cluster node communication on start up is sufficient to increase the counter
// no need to do anything specific
int count = SENT_REQUEST_COUNTER.get();
assertThat("Expected send request counter to be greather than zero", count, is(greaterThan(0)));
// sending a new request via client node will increase the sent requests
internalCluster().clientNodeClient().admin().cluster().prepareHealth().get();
assertThat("Expected send request counter to be greather than zero", SENT_REQUEST_COUNTER.get(), is(greaterThan(count)));
}
public static class CountingSentRequestsPlugin extends AbstractPlugin {
@Override
public String name() {
return "counting-pipelines-plugin";
}
@Override
public String description() {
return "counting-pipelines-plugin";
}
public void onModule(TransportModule transportModule) {
transportModule.setTransport(CountingAssertingLocalTransport.class, this.name());
}
}
public static final class CountingAssertingLocalTransport extends AssertingLocalTransport {
@Inject
public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) {
super(settings, threadPool, version);
}
@Override
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
SENT_REQUEST_COUNTER.incrementAndGet();
super.sendRequest(node, requestId, action, request, options);
}
}
}

View File

@ -31,7 +31,7 @@ import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.zen.ZenDiscoveryModule;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty.NettyTransportModule;
import org.elasticsearch.transport.netty.NettyTransport;
import org.junit.Before;
import org.junit.Ignore;
@ -138,7 +138,7 @@ public abstract class ElasticsearchBackwardsCompatIntegrationTest extends Elasti
protected Settings nodeSettings(int nodeOrdinal) {
return ImmutableSettings.builder()
.put(TransportModule.TRANSPORT_TYPE_KEY, NettyTransportModule.class) // run same transport / disco as external
.put(TransportModule.TRANSPORT_TYPE_KEY, NettyTransport.class) // run same transport / disco as external
.put(DiscoveryModule.DISCOVERY_TYPE_KEY, ZenDiscoveryModule.class)
.put("node.mode", "network") // we need network mode for this
.put("gateway.type", "local") // we require local gateway to mimic upgrades of nodes

View File

@ -79,7 +79,7 @@ import org.elasticsearch.test.cache.recycler.MockBigArraysModule;
import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule;
import org.elasticsearch.test.engine.MockEngineModule;
import org.elasticsearch.test.store.MockFSIndexStoreModule;
import org.elasticsearch.test.transport.AssertingLocalTransportModule;
import org.elasticsearch.test.transport.AssertingLocalTransport;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
@ -103,7 +103,6 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.systemPropertyAs
import static junit.framework.Assert.fail;
import static org.apache.lucene.util.LuceneTestCase.rarely;
import static org.apache.lucene.util.LuceneTestCase.usually;
import static org.elasticsearch.common.settings.ImmutableSettings.EMPTY;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy;
@ -331,7 +330,7 @@ public final class InternalTestCluster extends TestCluster {
builder.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName());
}
if (isLocalTransportConfigured()) {
builder.put(TransportModule.TRANSPORT_TYPE_KEY, AssertingLocalTransportModule.class.getName());
builder.put(TransportModule.TRANSPORT_TYPE_KEY, AssertingLocalTransport.class.getName());
} else {
builder.put(Transport.TransportSettings.TRANSPORT_TCP_COMPRESS, rarely(random));
}

View File

@ -1,42 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.Transport;
/**
*
*/
public class AssertingLocalTransportModule extends AbstractModule {
private final Settings settings;
public AssertingLocalTransportModule(Settings settings) {
this.settings = settings;
}
@Override
protected void configure() {
bind(AssertingLocalTransport.class).asEagerSingleton();
bind(Transport.class).to(AssertingLocalTransport.class).asEagerSingleton();
}
}

View File

@ -1,159 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.netty.MessageChannelHandler;
import org.elasticsearch.transport.netty.NettyTransport;
import org.elasticsearch.transport.netty.NettyTransportChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
*
*/
public class ConfigurableErrorNettyTransportModule extends AbstractModule {
@Override
protected void configure() {
bind(ExceptionThrowingNettyTransport.class).asEagerSingleton();
bind(Transport.class).to(ExceptionThrowingNettyTransport.class).asEagerSingleton();
}
public static final class ExceptionThrowingNettyTransport extends NettyTransport {
@Inject
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
super(settings, threadPool, networkService, bigArrays, version);
}
@Override
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new ErrorPipelineFactory(this);
}
private static class ErrorPipelineFactory extends ServerChannelPipeFactory {
private final ESLogger logger;
public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport) {
super(exceptionThrowingNettyTransport);
this.logger = exceptionThrowingNettyTransport.logger;
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = super.getPipeline();
pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) {
@Override
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);
if (handler == null) {
throw new ActionNotFoundTransportException(action);
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
if (request.hasHeader("ERROR")) {
throw new ElasticsearchException((String) request.getHeader("ERROR"));
}
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.messageReceived(request, transportChannel);
} else {
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
}
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
return action;
}
class RequestHandler extends AbstractRunnable {
private final TransportRequestHandler handler;
private final TransportRequest request;
private final NettyTransportChannel transportChannel;
private final String action;
public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) {
this.handler = handler;
this.request = request;
this.transportChannel = transportChannel;
this.action = action;
}
@SuppressWarnings({"unchecked"})
@Override
public void run() {
try {
handler.messageReceived(request, transportChannel);
} catch (Throwable e) {
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
// we can only send a response transport is started....
try {
transportChannel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
logger.warn("Actual Exception", e);
}
}
}
}
@Override
public boolean isForceExecution() {
return handler.isForceExecution();
}
}
});
return pipeline;
}
}
}
}

View File

@ -19,15 +19,37 @@
package org.elasticsearch.test.transport;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ActionNotFoundTransportException;
import org.elasticsearch.transport.TransportModule;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.netty.MessageChannelHandler;
import org.elasticsearch.transport.netty.NettyTransport;
import org.elasticsearch.transport.netty.NettyTransportChannel;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
@ -44,7 +66,7 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest {
protected Settings nodeSettings(int nodeOrdinal) {
ImmutableSettings.Builder builder = settingsBuilder()
.put("node.mode", "network")
.put(TransportModule.TRANSPORT_TYPE_KEY, ConfigurableErrorNettyTransportModule.class);
.put(TransportModule.TRANSPORT_TYPE_KEY, ExceptionThrowingNettyTransport.class.getName());
return builder.put(super.nodeSettings(nodeOrdinal)).build();
}
@ -61,4 +83,104 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest {
assertThat(e.getMessage(), containsString("MY MESSAGE"));
}
}
public static final class ExceptionThrowingNettyTransport extends NettyTransport {
@Inject
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) {
super(settings, threadPool, networkService, bigArrays, version);
}
@Override
public ChannelPipelineFactory configureServerChannelPipelineFactory() {
return new ErrorPipelineFactory(this);
}
private static class ErrorPipelineFactory extends ServerChannelPipeFactory {
private final ESLogger logger;
public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport) {
super(exceptionThrowingNettyTransport);
this.logger = exceptionThrowingNettyTransport.logger;
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = super.getPipeline();
pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) {
@Override
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
final String action = buffer.readString();
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version);
try {
final TransportRequestHandler handler = transportServiceAdapter.handler(action, version);
if (handler == null) {
throw new ActionNotFoundTransportException(action);
}
final TransportRequest request = handler.newInstance();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);
if (request.hasHeader("ERROR")) {
throw new ElasticsearchException((String) request.getHeader("ERROR"));
}
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.messageReceived(request, transportChannel);
} else {
threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action));
}
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
} catch (IOException e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e);
logger.warn("Actual Exception", e1);
}
}
return action;
}
class RequestHandler extends AbstractRunnable {
private final TransportRequestHandler handler;
private final TransportRequest request;
private final NettyTransportChannel transportChannel;
private final String action;
public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) {
this.handler = handler;
this.request = request;
this.transportChannel = transportChannel;
this.action = action;
}
@SuppressWarnings({"unchecked"})
@Override
public void run() {
try {
handler.messageReceived(request, transportChannel);
} catch (Throwable e) {
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
// we can only send a response transport is started....
try {
transportChannel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
logger.warn("Actual Exception", e);
}
}
}
}
@Override
public boolean isForceExecution() {
return handler.isForceExecution();
}
}
});
return pipeline;
}
}
}
}