diff --git a/src/main/java/org/elasticsearch/http/HttpServerModule.java b/src/main/java/org/elasticsearch/http/HttpServerModule.java index 9527a06b141..e197c3afbfd 100644 --- a/src/main/java/org/elasticsearch/http/HttpServerModule.java +++ b/src/main/java/org/elasticsearch/http/HttpServerModule.java @@ -19,33 +19,50 @@ package org.elasticsearch.http; -import com.google.common.collect.ImmutableList; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.Modules; -import org.elasticsearch.common.inject.SpawnModules; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.http.netty.NettyHttpServerTransportModule; +import org.elasticsearch.http.netty.NettyHttpServerTransport; +import org.elasticsearch.plugins.Plugin; + +import static org.elasticsearch.common.Preconditions.checkNotNull; /** * */ -public class HttpServerModule extends AbstractModule implements SpawnModules { +public class HttpServerModule extends AbstractModule { private final Settings settings; + private final ESLogger logger; + + private Class configuredHttpServerTransport; + private String configuredHttpServerTransportSource; public HttpServerModule(Settings settings) { this.settings = settings; - } - - @Override - public Iterable spawnModules() { - return ImmutableList.of(Modules.createModule(settings.getAsClass("http.type", NettyHttpServerTransportModule.class, "org.elasticsearch.http.", "HttpServerTransportModule"), settings)); + this.logger = Loggers.getLogger(getClass(), settings); } @SuppressWarnings({"unchecked"}) @Override protected void configure() { + if (configuredHttpServerTransport != null) { + logger.info("Using [{}] as http transport, overridden by [{}]", configuredHttpServerTransport.getName(), configuredHttpServerTransportSource); + bind(HttpServerTransport.class).to(configuredHttpServerTransport).asEagerSingleton(); + } else { + Class defaultHttpServerTransport = NettyHttpServerTransport.class; + Class httpServerTransport = settings.getAsClass("http.type", defaultHttpServerTransport, "org.elasticsearch.http.", "HttpServerTransport"); + bind(HttpServerTransport.class).to(httpServerTransport).asEagerSingleton(); + } + bind(HttpServer.class).asEagerSingleton(); } + + public void setHttpServerTransport(Class httpServerTransport, String source) { + checkNotNull(httpServerTransport, "Configured http server transport may not be null"); + checkNotNull(source, "Plugin, that changes transport may not be null"); + this.configuredHttpServerTransport = httpServerTransport; + this.configuredHttpServerTransportSource = source; + } } diff --git a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransportModule.java b/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransportModule.java deleted file mode 100644 index 40ea150e9a9..00000000000 --- a/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransportModule.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.http.netty; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.http.HttpServerTransport; - -/** - * - */ -public class NettyHttpServerTransportModule extends AbstractModule { - - @Override - protected void configure() { - bind(HttpServerTransport.class).to(NettyHttpServerTransport.class).asEagerSingleton(); - } -} diff --git a/src/main/java/org/elasticsearch/transport/TransportModule.java b/src/main/java/org/elasticsearch/transport/TransportModule.java index 1a8942bd861..6413e70624f 100644 --- a/src/main/java/org/elasticsearch/transport/TransportModule.java +++ b/src/main/java/org/elasticsearch/transport/TransportModule.java @@ -19,48 +19,74 @@ package org.elasticsearch.transport; -import com.google.common.collect.ImmutableList; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Module; -import org.elasticsearch.common.inject.Modules; -import org.elasticsearch.common.inject.SpawnModules; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.transport.local.LocalTransportModule; -import org.elasticsearch.transport.netty.NettyTransportModule; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.local.LocalTransport; +import org.elasticsearch.transport.netty.NettyTransport; + +import static org.elasticsearch.common.Preconditions.checkNotNull; /** * */ -public class TransportModule extends AbstractModule implements SpawnModules { +public class TransportModule extends AbstractModule { - private final Settings settings; - public static final String TRANSPORT_TYPE_KEY = "transport.type"; public static final String TRANSPORT_SERVICE_TYPE_KEY = "transport.service.type"; + private final ESLogger logger; + private final Settings settings; + + private Class configuredTransportService; + private Class configuredTransport; + private String configuredTransportServiceSource; + private String configuredTransportSource; + public TransportModule(Settings settings) { this.settings = settings; - } - - @Override - public Iterable spawnModules() { - Class defaultTransportModule; - if (DiscoveryNode.localNode(settings)) { - defaultTransportModule = LocalTransportModule.class; - } else { - defaultTransportModule = NettyTransportModule.class; - } - return ImmutableList.of(Modules.createModule(settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransportModule, "org.elasticsearch.transport.", "TransportModule"), settings)); + this.logger = Loggers.getLogger(getClass(), settings); } @Override protected void configure() { - Class transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, TransportService.class, "org.elasticsearch.transport.", "TransportService"); - if (!TransportService.class.equals(transportService)) { - bind(TransportService.class).to(transportService).asEagerSingleton(); + if (configuredTransportService != null) { + logger.info("Using [{}] as transport service, overridden by [{}]", configuredTransportService.getName(), configuredTransportServiceSource); + bind(TransportService.class).to(configuredTransportService).asEagerSingleton(); } else { - bind(TransportService.class).asEagerSingleton(); + Class defaultTransportService = TransportService.class; + Class transportService = settings.getAsClass(TRANSPORT_SERVICE_TYPE_KEY, defaultTransportService, "org.elasticsearch.transport.", "TransportService"); + if (!TransportService.class.equals(transportService)) { + bind(TransportService.class).to(transportService).asEagerSingleton(); + } else { + bind(TransportService.class).asEagerSingleton(); + } + } + + if (configuredTransport != null) { + logger.info("Using [{}] as transport, overridden by [{}]", configuredTransport.getName(), configuredTransportSource); + bind(Transport.class).to(configuredTransport).asEagerSingleton(); + } else { + Class defaultTransport = DiscoveryNode.localNode(settings) ? LocalTransport.class : NettyTransport.class; + Class transport = settings.getAsClass(TRANSPORT_TYPE_KEY, defaultTransport, "org.elasticsearch.transport.", "Transport"); + bind(Transport.class).to(transport).asEagerSingleton(); } } + + public void setTransportService(Class transportService, String source) { + checkNotNull(transportService, "Configured transport service may not be null"); + checkNotNull(source, "Plugin, that changes transport service may not be null"); + this.configuredTransportService = transportService; + this.configuredTransportServiceSource = source; + } + + public void setTransport(Class transport, String source) { + checkNotNull(transport, "Configured transport may not be null"); + checkNotNull(source, "Plugin, that changes transport may not be null"); + this.configuredTransport = transport; + this.configuredTransportSource = source; + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/transport/local/LocalTransportModule.java b/src/main/java/org/elasticsearch/transport/local/LocalTransportModule.java deleted file mode 100644 index ea0a44b74b9..00000000000 --- a/src/main/java/org/elasticsearch/transport/local/LocalTransportModule.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.local; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.transport.Transport; - -/** - * - */ -public class LocalTransportModule extends AbstractModule { - - private final Settings settings; - - public LocalTransportModule(Settings settings) { - this.settings = settings; - } - - @Override - protected void configure() { - bind(LocalTransport.class).asEagerSingleton(); - bind(Transport.class).to(LocalTransport.class).asEagerSingleton(); - } -} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransportModule.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransportModule.java deleted file mode 100644 index b96e9eedd05..00000000000 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransportModule.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.transport.netty; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.transport.Transport; - -/** - * - */ -public class NettyTransportModule extends AbstractModule { - - private final Settings settings; - - public NettyTransportModule(Settings settings) { - this.settings = settings; - } - - @Override - protected void configure() { - bind(NettyTransport.class).asEagerSingleton(); - bind(Transport.class).to(NettyTransport.class).asEagerSingleton(); - } -} diff --git a/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleTests.java b/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleTests.java new file mode 100644 index 00000000000..3b201d5110d --- /dev/null +++ b/src/test/java/org/elasticsearch/plugins/PluggableTransportModuleTests.java @@ -0,0 +1,100 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.plugins; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.transport.AssertingLocalTransport; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.*; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; +import static org.hamcrest.Matchers.*; + +/** + * + */ +@ClusterScope(scope = Scope.SUITE, numDataNodes = 2) +public class PluggableTransportModuleTests extends ElasticsearchIntegrationTest { + + public static final AtomicInteger SENT_REQUEST_COUNTER = new AtomicInteger(0); + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return settingsBuilder() + .put("plugin.types", CountingSentRequestsPlugin.class.getName()) + .put(super.nodeSettings(nodeOrdinal)) + .build(); + } + + @Test + public void testThatPluginFunctionalityIsLoadedWithoutConfiguration() throws Exception { + for (Transport transport : internalCluster().getInstances(Transport.class)) { + assertThat(transport, instanceOf(CountingAssertingLocalTransport.class)); + } + + // the cluster node communication on start up is sufficient to increase the counter + // no need to do anything specific + int count = SENT_REQUEST_COUNTER.get(); + assertThat("Expected send request counter to be greather than zero", count, is(greaterThan(0))); + + // sending a new request via client node will increase the sent requests + internalCluster().clientNodeClient().admin().cluster().prepareHealth().get(); + assertThat("Expected send request counter to be greather than zero", SENT_REQUEST_COUNTER.get(), is(greaterThan(count))); + } + + public static class CountingSentRequestsPlugin extends AbstractPlugin { + @Override + public String name() { + return "counting-pipelines-plugin"; + } + + @Override + public String description() { + return "counting-pipelines-plugin"; + } + + public void onModule(TransportModule transportModule) { + transportModule.setTransport(CountingAssertingLocalTransport.class, this.name()); + } + } + + public static final class CountingAssertingLocalTransport extends AssertingLocalTransport { + + @Inject + public CountingAssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version) { + super(settings, threadPool, version); + } + + @Override + public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + SENT_REQUEST_COUNTER.incrementAndGet(); + super.sendRequest(node, requestId, action, request, options); + } + } +} diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchBackwardsCompatIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchBackwardsCompatIntegrationTest.java index 7b5ef984ba9..678f51cb89e 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchBackwardsCompatIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchBackwardsCompatIntegrationTest.java @@ -31,7 +31,7 @@ import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.zen.ZenDiscoveryModule; import org.elasticsearch.transport.TransportModule; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.transport.netty.NettyTransportModule; +import org.elasticsearch.transport.netty.NettyTransport; import org.junit.Before; import org.junit.Ignore; @@ -138,7 +138,7 @@ public abstract class ElasticsearchBackwardsCompatIntegrationTest extends Elasti protected Settings nodeSettings(int nodeOrdinal) { return ImmutableSettings.builder() - .put(TransportModule.TRANSPORT_TYPE_KEY, NettyTransportModule.class) // run same transport / disco as external + .put(TransportModule.TRANSPORT_TYPE_KEY, NettyTransport.class) // run same transport / disco as external .put(DiscoveryModule.DISCOVERY_TYPE_KEY, ZenDiscoveryModule.class) .put("node.mode", "network") // we need network mode for this .put("gateway.type", "local") // we require local gateway to mimic upgrades of nodes diff --git a/src/test/java/org/elasticsearch/test/InternalTestCluster.java b/src/test/java/org/elasticsearch/test/InternalTestCluster.java index f8e462db60c..9870ae23cd3 100644 --- a/src/test/java/org/elasticsearch/test/InternalTestCluster.java +++ b/src/test/java/org/elasticsearch/test/InternalTestCluster.java @@ -79,7 +79,7 @@ import org.elasticsearch.test.cache.recycler.MockBigArraysModule; import org.elasticsearch.test.cache.recycler.MockPageCacheRecyclerModule; import org.elasticsearch.test.engine.MockEngineModule; import org.elasticsearch.test.store.MockFSIndexStoreModule; -import org.elasticsearch.test.transport.AssertingLocalTransportModule; +import org.elasticsearch.test.transport.AssertingLocalTransport; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; @@ -103,7 +103,6 @@ import static com.carrotsearch.randomizedtesting.RandomizedTest.systemPropertyAs import static junit.framework.Assert.fail; import static org.apache.lucene.util.LuceneTestCase.rarely; import static org.apache.lucene.util.LuceneTestCase.usually; -import static org.elasticsearch.common.settings.ImmutableSettings.EMPTY; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.node.NodeBuilder.nodeBuilder; import static org.elasticsearch.test.ElasticsearchTestCase.assertBusy; @@ -331,7 +330,7 @@ public final class InternalTestCluster extends TestCluster { builder.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()); } if (isLocalTransportConfigured()) { - builder.put(TransportModule.TRANSPORT_TYPE_KEY, AssertingLocalTransportModule.class.getName()); + builder.put(TransportModule.TRANSPORT_TYPE_KEY, AssertingLocalTransport.class.getName()); } else { builder.put(Transport.TransportSettings.TRANSPORT_TCP_COMPRESS, rarely(random)); } diff --git a/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransportModule.java b/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransportModule.java deleted file mode 100644 index 47c8ee1746d..00000000000 --- a/src/test/java/org/elasticsearch/test/transport/AssertingLocalTransportModule.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.test.transport; - -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.transport.Transport; - -/** - * - */ -public class AssertingLocalTransportModule extends AbstractModule { - - private final Settings settings; - - public AssertingLocalTransportModule(Settings settings) { - this.settings = settings; - } - - @Override - protected void configure() { - bind(AssertingLocalTransport.class).asEagerSingleton(); - bind(Transport.class).to(AssertingLocalTransport.class).asEagerSingleton(); - } -} \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java b/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java deleted file mode 100644 index da2a84fede8..00000000000 --- a/src/test/java/org/elasticsearch/test/transport/ConfigurableErrorNettyTransportModule.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.test.transport; - -import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.Version; -import org.elasticsearch.common.component.Lifecycle; -import org.elasticsearch.common.inject.AbstractModule; -import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.logging.ESLogger; -import org.elasticsearch.common.network.NetworkService; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.InetSocketTransportAddress; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.ActionNotFoundTransportException; -import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportRequest; -import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.netty.MessageChannelHandler; -import org.elasticsearch.transport.netty.NettyTransport; -import org.elasticsearch.transport.netty.NettyTransportChannel; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; - -/** - * - */ -public class ConfigurableErrorNettyTransportModule extends AbstractModule { - - @Override - protected void configure() { - bind(ExceptionThrowingNettyTransport.class).asEagerSingleton(); - bind(Transport.class).to(ExceptionThrowingNettyTransport.class).asEagerSingleton(); - - } - - public static final class ExceptionThrowingNettyTransport extends NettyTransport { - - @Inject - public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { - super(settings, threadPool, networkService, bigArrays, version); - } - - @Override - public ChannelPipelineFactory configureServerChannelPipelineFactory() { - return new ErrorPipelineFactory(this); - } - - private static class ErrorPipelineFactory extends ServerChannelPipeFactory { - - private final ESLogger logger; - - public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport) { - super(exceptionThrowingNettyTransport); - this.logger = exceptionThrowingNettyTransport.logger; - } - - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = super.getPipeline(); - pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) { - - @Override - protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { - final String action = buffer.readString(); - - final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); - try { - final TransportRequestHandler handler = transportServiceAdapter.handler(action, version); - if (handler == null) { - throw new ActionNotFoundTransportException(action); - } - final TransportRequest request = handler.newInstance(); - request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); - request.readFrom(buffer); - if (request.hasHeader("ERROR")) { - throw new ElasticsearchException((String) request.getHeader("ERROR")); - } - if (handler.executor() == ThreadPool.Names.SAME) { - //noinspection unchecked - handler.messageReceived(request, transportChannel); - } else { - threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action)); - } - } catch (Throwable e) { - try { - transportChannel.sendResponse(e); - } catch (IOException e1) { - logger.warn("Failed to send error message back to client for action [" + action + "]", e); - logger.warn("Actual Exception", e1); - } - } - return action; - } - - class RequestHandler extends AbstractRunnable { - private final TransportRequestHandler handler; - private final TransportRequest request; - private final NettyTransportChannel transportChannel; - private final String action; - - public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) { - this.handler = handler; - this.request = request; - this.transportChannel = transportChannel; - this.action = action; - } - - @SuppressWarnings({"unchecked"}) - @Override - public void run() { - try { - handler.messageReceived(request, transportChannel); - } catch (Throwable e) { - if (transport.lifecycleState() == Lifecycle.State.STARTED) { - // we can only send a response transport is started.... - try { - transportChannel.sendResponse(e); - } catch (Throwable e1) { - logger.warn("Failed to send error message back to client for action [" + action + "]", e1); - logger.warn("Actual Exception", e); - } - } - } - } - - @Override - public boolean isForceExecution() { - return handler.isForceExecution(); - } - } - }); - return pipeline; - } - } - } -} diff --git a/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java index ed5dab3fde2..0dce7a723bb 100644 --- a/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java +++ b/src/test/java/org/elasticsearch/test/transport/NettyTransportTests.java @@ -19,15 +19,37 @@ package org.elasticsearch.test.transport; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.client.Client; +import org.elasticsearch.common.component.Lifecycle; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ActionNotFoundTransportException; import org.elasticsearch.transport.TransportModule; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.netty.MessageChannelHandler; +import org.elasticsearch.transport.netty.NettyTransport; +import org.elasticsearch.transport.netty.NettyTransportChannel; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineFactory; import org.junit.Test; +import java.io.IOException; +import java.net.InetSocketAddress; + import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; import static org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; @@ -44,7 +66,7 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest { protected Settings nodeSettings(int nodeOrdinal) { ImmutableSettings.Builder builder = settingsBuilder() .put("node.mode", "network") - .put(TransportModule.TRANSPORT_TYPE_KEY, ConfigurableErrorNettyTransportModule.class); + .put(TransportModule.TRANSPORT_TYPE_KEY, ExceptionThrowingNettyTransport.class.getName()); return builder.put(super.nodeSettings(nodeOrdinal)).build(); } @@ -61,4 +83,104 @@ public class NettyTransportTests extends ElasticsearchIntegrationTest { assertThat(e.getMessage(), containsString("MY MESSAGE")); } } + + public static final class ExceptionThrowingNettyTransport extends NettyTransport { + + @Inject + public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version) { + super(settings, threadPool, networkService, bigArrays, version); + } + + @Override + public ChannelPipelineFactory configureServerChannelPipelineFactory() { + return new ErrorPipelineFactory(this); + } + + private static class ErrorPipelineFactory extends ServerChannelPipeFactory { + + private final ESLogger logger; + + public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport) { + super(exceptionThrowingNettyTransport); + this.logger = exceptionThrowingNettyTransport.logger; + } + + public ChannelPipeline getPipeline() throws Exception { + ChannelPipeline pipeline = super.getPipeline(); + pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger) { + + @Override + protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException { + final String action = buffer.readString(); + + final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, action, channel, requestId, version); + try { + final TransportRequestHandler handler = transportServiceAdapter.handler(action, version); + if (handler == null) { + throw new ActionNotFoundTransportException(action); + } + final TransportRequest request = handler.newInstance(); + request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress())); + request.readFrom(buffer); + if (request.hasHeader("ERROR")) { + throw new ElasticsearchException((String) request.getHeader("ERROR")); + } + if (handler.executor() == ThreadPool.Names.SAME) { + //noinspection unchecked + handler.messageReceived(request, transportChannel); + } else { + threadPool.executor(handler.executor()).execute(new RequestHandler(handler, request, transportChannel, action)); + } + } catch (Throwable e) { + try { + transportChannel.sendResponse(e); + } catch (IOException e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e); + logger.warn("Actual Exception", e1); + } + } + return action; + } + + class RequestHandler extends AbstractRunnable { + private final TransportRequestHandler handler; + private final TransportRequest request; + private final NettyTransportChannel transportChannel; + private final String action; + + public RequestHandler(TransportRequestHandler handler, TransportRequest request, NettyTransportChannel transportChannel, String action) { + this.handler = handler; + this.request = request; + this.transportChannel = transportChannel; + this.action = action; + } + + @SuppressWarnings({"unchecked"}) + @Override + public void run() { + try { + handler.messageReceived(request, transportChannel); + } catch (Throwable e) { + if (transport.lifecycleState() == Lifecycle.State.STARTED) { + // we can only send a response transport is started.... + try { + transportChannel.sendResponse(e); + } catch (Throwable e1) { + logger.warn("Failed to send error message back to client for action [" + action + "]", e1); + logger.warn("Actual Exception", e); + } + } + } + } + + @Override + public boolean isForceExecution() { + return handler.isForceExecution(); + } + } + }); + return pipeline; + } + } + } }