From f30b1f82eea3ae892f33e143c8237f89baf09a95 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 16 Jan 2017 21:06:08 +0100 Subject: [PATCH] Remove HttpServer and HttpServerAdapter in favor of a simple dispatch method (#22636) Today we have quite some abstractions that are essentially providing a simple dispatch method to the plugins defining a `HttpServerTransport`. This commit removes `HttpServer` and `HttpServerAdaptor` and introduces a simple `Dispatcher` functional interface that delegate to `RestController` by default. Relates to #18482 --- .../resources/checkstyle_suppressions.xml | 1 - .../elasticsearch/action/ActionModule.java | 19 +- .../node/info/TransportNodesInfoAction.java | 2 +- .../node/stats/TransportNodesStatsAction.java | 2 +- .../stats/TransportClusterStatsAction.java | 2 +- .../ingest/DeletePipelineTransportAction.java | 2 +- .../ingest/GetPipelineTransportAction.java | 2 +- .../ingest/PutPipelineTransportAction.java | 2 +- .../SimulatePipelineTransportAction.java | 3 +- .../elasticsearch/bootstrap/Bootstrap.java | 3 +- .../cli/EnvironmentAwareCommand.java | 2 +- .../client/transport/TransportClient.java | 6 +- .../common/network/NetworkModule.java | 5 +- .../common/settings/SettingsModule.java | 6 +- .../org/elasticsearch/http/HttpServer.java | 206 ---------------- .../elasticsearch/http/HttpServerAdapter.java | 30 --- .../http/HttpServerTransport.java | 16 +- .../index/translog/TranslogToolCli.java | 4 - .../InternalSettingsPreparer.java | 9 +- .../java/org/elasticsearch/node/Node.java | 41 ++-- .../org/elasticsearch/node/NodeModule.java | 2 - .../node/{service => }/NodeService.java | 22 +- .../plugins/InstallPluginCommand.java | 3 - .../plugins/ListPluginsCommand.java | 3 - .../elasticsearch/plugins/NetworkPlugin.java | 7 +- .../elasticsearch/rest/RestController.java | 132 +++++++++- .../common/network/NetworkModuleTests.java | 15 +- .../elasticsearch/http/HttpServerTests.java | 231 ------------------ ...gestProcessorNotInstalledOnAllNodesIT.java | 2 +- .../InternalSettingsPreparerTests.java | 1 + .../rest/RestControllerTests.java | 203 ++++++++++++++- .../cluster/RestNodesStatsActionTests.java | 3 +- .../indices/RestIndicesStatsActionTests.java | 2 +- .../action/cat/RestIndicesActionTests.java | 2 +- .../action/cat/RestRecoveryActionTests.java | 2 +- .../netty4/Netty4HttpServerTransport.java | 16 +- .../elasticsearch/transport/Netty4Plugin.java | 9 +- .../http/netty4/Netty4HttpChannelTests.java | 9 +- .../Netty4HttpServerPipeliningTests.java | 2 +- .../Netty4HttpServerTransportTests.java | 9 +- .../java/org/elasticsearch/node/MockNode.java | 4 - .../test/AbstractQueryTestCase.java | 2 +- .../test/InternalTestCluster.java | 3 +- 43 files changed, 464 insertions(+), 583 deletions(-) delete mode 100644 core/src/main/java/org/elasticsearch/http/HttpServer.java delete mode 100644 core/src/main/java/org/elasticsearch/http/HttpServerAdapter.java rename core/src/main/java/org/elasticsearch/node/{internal => }/InternalSettingsPreparer.java (96%) rename core/src/main/java/org/elasticsearch/node/{service => }/NodeService.java (88%) delete mode 100644 core/src/test/java/org/elasticsearch/http/HttpServerTests.java diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index a3e5af6c4d4..a089d677dc9 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -430,7 +430,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java index a24ed5f8083..0560b11cbb8 100644 --- a/core/src/main/java/org/elasticsearch/action/ActionModule.java +++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java @@ -19,7 +19,6 @@ package org.elasticsearch.action; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -196,6 +195,7 @@ import org.elasticsearch.action.termvectors.TransportShardMultiTermsVectorAction import org.elasticsearch.action.termvectors.TransportTermVectorsAction; import org.elasticsearch.action.update.TransportUpdateAction; import org.elasticsearch.action.update.UpdateAction; +import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.common.NamedRegistry; import org.elasticsearch.common.inject.AbstractModule; @@ -205,6 +205,7 @@ import org.elasticsearch.common.logging.ESLoggerFactory; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.ActionPlugin.ActionHandler; import org.elasticsearch.rest.RestController; @@ -332,7 +333,8 @@ public class ActionModule extends AbstractModule { private final RestController restController; public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver resolver, - ClusterSettings clusterSettings, ThreadPool threadPool, List actionPlugins) { + ClusterSettings clusterSettings, ThreadPool threadPool, List actionPlugins, + NodeClient nodeClient, CircuitBreakerService circuitBreakerService) { this.transportClient = transportClient; this.settings = settings; this.actionPlugins = actionPlugins; @@ -352,9 +354,14 @@ public class ActionModule extends AbstractModule { restWrapper = newRestWrapper; } } - restController = new RestController(settings, headers, restWrapper); + if (transportClient) { + restController = null; + } else { + restController = new RestController(settings, headers, restWrapper, nodeClient, circuitBreakerService); + } } + public Map> getActions() { return actions; } @@ -648,8 +655,10 @@ public class ActionModule extends AbstractModule { } } - // Bind the RestController which is required (by Node) even if rest isn't enabled. - bind(RestController.class).toInstance(restController); + if (restController != null) { + // Bind the RestController which is required (by Node) even if rest isn't enabled. + bind(RestController.class).toInstance(restController); + } // Setup the RestHandlers if (NetworkModule.HTTP_ENABLED.get(settings)) { diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java index c26554b25e0..7d80b84d5d2 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java @@ -29,7 +29,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index b4cef38d28d..e4034582f96 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -29,7 +29,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 45eb83dd9e1..d77bc599258 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -39,7 +39,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java index 74ce894b053..45cb83634f8 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java @@ -30,7 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java index 8bac5c7b804..f64b36d47ae 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java @@ -30,7 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java index 82cd8d8eb7b..7dde9818049 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java @@ -36,7 +36,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.ingest.PipelineStore; import org.elasticsearch.ingest.IngestInfo; -import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java index 4f9a219c8ad..61fd400a1d3 100644 --- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java +++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.ingest; -import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -28,7 +27,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.ingest.PipelineStore; -import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; diff --git a/core/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java b/core/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java index 33f3f922fa4..2b47908c352 100644 --- a/core/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java +++ b/core/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java @@ -40,7 +40,6 @@ import org.elasticsearch.common.logging.LogConfigurator; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.network.IfConfig; import org.elasticsearch.common.settings.KeyStoreWrapper; -import org.elasticsearch.common.settings.SecureSetting; import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; @@ -50,7 +49,7 @@ import org.elasticsearch.monitor.os.OsProbe; import org.elasticsearch.monitor.process.ProcessProbe; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; -import org.elasticsearch.node.internal.InternalSettingsPreparer; +import org.elasticsearch.node.InternalSettingsPreparer; import java.io.ByteArrayOutputStream; import java.io.IOException; diff --git a/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java b/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java index b19fc4ca957..8372a6b8ab8 100644 --- a/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java +++ b/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java @@ -24,7 +24,7 @@ import joptsimple.OptionSpec; import joptsimple.util.KeyValuePair; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.elasticsearch.node.internal.InternalSettingsPreparer; +import org.elasticsearch.node.InternalSettingsPreparer; import java.util.HashMap; import java.util.Locale; diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 803c0e6d1d1..51bed4a4582 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -46,7 +46,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.node.Node; -import org.elasticsearch.node.internal.InternalSettingsPreparer; +import org.elasticsearch.node.InternalSettingsPreparer; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.NetworkPlugin; import org.elasticsearch.plugins.Plugin; @@ -160,7 +160,7 @@ public abstract class TransportClient extends AbstractClient { } modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool)); ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getClusterSettings(), - threadPool, pluginsService.filterPlugins(ActionPlugin.class)); + threadPool, pluginsService.filterPlugins(ActionPlugin.class), null, null); modules.add(actionModule); CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(), @@ -170,7 +170,7 @@ public abstract class TransportClient extends AbstractClient { resourcesToClose.add(bigArrays); modules.add(settingsModule); NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, - bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService); + bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = new TransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java index 04f6b62dde1..81d228da230 100644 --- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java +++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry.FromXContent; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.plugins.NetworkPlugin; +import org.elasticsearch.rest.RestController; import org.elasticsearch.tasks.RawTaskStatus; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; @@ -109,13 +110,13 @@ public final class NetworkModule { CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, - NetworkService networkService) { + NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) { this.settings = settings; this.transportClient = transportClient; for (NetworkPlugin plugin : plugins) { if (transportClient == false && HTTP_ENABLED.get(settings)) { Map> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays, - circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService); + circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher); for (Map.Entry> entry : httpTransportFactory.entrySet()) { registerHttpTransport(entry.getKey(), entry.getValue()); } diff --git a/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java b/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java index 60276ce14f7..44d18208803 100644 --- a/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java +++ b/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java @@ -54,6 +54,7 @@ public class SettingsModule implements Module { private final Logger logger; private final IndexScopedSettings indexScopedSettings; private final ClusterSettings clusterSettings; + private final SettingsFilter settingsFilter; public SettingsModule(Settings settings, Setting... additionalSettings) { this(settings, Arrays.asList(additionalSettings), Collections.emptyList()); @@ -137,12 +138,13 @@ public class SettingsModule implements Module { final Predicate acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate(); clusterSettings.validate(settings.filter(acceptOnlyClusterSettings)); validateTribeSettings(settings, clusterSettings); + this.settingsFilter = new SettingsFilter(settings, settingsFilterPattern); } @Override public void configure(Binder binder) { binder.bind(Settings.class).toInstance(settings); - binder.bind(SettingsFilter.class).toInstance(new SettingsFilter(settings, settingsFilterPattern)); + binder.bind(SettingsFilter.class).toInstance(settingsFilter); binder.bind(ClusterSettings.class).toInstance(clusterSettings); binder.bind(IndexScopedSettings.class).toInstance(indexScopedSettings); } @@ -218,4 +220,6 @@ public class SettingsModule implements Module { public ClusterSettings getClusterSettings() { return clusterSettings; } + + public SettingsFilter getSettingsFilter() { return settingsFilter; } } diff --git a/core/src/main/java/org/elasticsearch/http/HttpServer.java b/core/src/main/java/org/elasticsearch/http/HttpServer.java deleted file mode 100644 index 06bc392587a..00000000000 --- a/core/src/main/java/org/elasticsearch/http/HttpServer.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.http; - -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.client.node.NodeClient; -import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.io.Streams; -import org.elasticsearch.common.io.stream.BytesStreamOutput; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestController; -import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestResponse; -import org.elasticsearch.rest.RestStatus; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.elasticsearch.rest.RestStatus.FORBIDDEN; -import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; - -/** - * A component to serve http requests, backed by rest handlers. - */ -public class HttpServer extends AbstractLifecycleComponent implements HttpServerAdapter { - private final HttpServerTransport transport; - - private final RestController restController; - - private final NodeClient client; - - private final CircuitBreakerService circuitBreakerService; - - public HttpServer(Settings settings, HttpServerTransport transport, RestController restController, - NodeClient client, CircuitBreakerService circuitBreakerService) { - super(settings); - this.transport = transport; - this.restController = restController; - this.client = client; - this.circuitBreakerService = circuitBreakerService; - transport.httpServerAdapter(this); - } - - - @Override - protected void doStart() { - transport.start(); - if (logger.isInfoEnabled()) { - logger.info("{}", transport.boundAddress()); - } - } - - @Override - protected void doStop() { - transport.stop(); - } - - @Override - protected void doClose() { - transport.close(); - } - - public HttpInfo info() { - return transport.info(); - } - - public HttpStats stats() { - return transport.stats(); - } - - public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { - if (request.rawPath().equals("/favicon.ico")) { - handleFavicon(request, channel); - return; - } - RestChannel responseChannel = channel; - try { - int contentLength = request.content().length(); - if (restController.canTripCircuitBreaker(request)) { - inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, ""); - } else { - inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength); - } - // iff we could reserve bytes for the request we need to send the response also over this channel - responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); - restController.dispatchRequest(request, responseChannel, client, threadContext); - } catch (Exception e) { - try { - responseChannel.sendResponse(new BytesRestResponse(channel, e)); - } catch (Exception inner) { - inner.addSuppressed(e); - logger.error((Supplier) () -> - new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner); - } - } - } - - void handleFavicon(RestRequest request, RestChannel channel) { - if (request.method() == RestRequest.Method.GET) { - try { - try (InputStream stream = getClass().getResourceAsStream("/config/favicon.ico")) { - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Streams.copy(stream, out); - BytesRestResponse restResponse = new BytesRestResponse(RestStatus.OK, "image/x-icon", out.toByteArray()); - channel.sendResponse(restResponse); - } - } catch (IOException e) { - channel.sendResponse(new BytesRestResponse(INTERNAL_SERVER_ERROR, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)); - } - } else { - channel.sendResponse(new BytesRestResponse(FORBIDDEN, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)); - } - } - - private static final class ResourceHandlingHttpChannel implements RestChannel { - private final RestChannel delegate; - private final CircuitBreakerService circuitBreakerService; - private final int contentLength; - private final AtomicBoolean closed = new AtomicBoolean(); - - public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) { - this.delegate = delegate; - this.circuitBreakerService = circuitBreakerService; - this.contentLength = contentLength; - } - - @Override - public XContentBuilder newBuilder() throws IOException { - return delegate.newBuilder(); - } - - @Override - public XContentBuilder newErrorBuilder() throws IOException { - return delegate.newErrorBuilder(); - } - - @Override - public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { - return delegate.newBuilder(autoDetectSource, useFiltering); - } - - @Override - public BytesStreamOutput bytesOutput() { - return delegate.bytesOutput(); - } - - @Override - public RestRequest request() { - return delegate.request(); - } - - @Override - public boolean detailedErrorsEnabled() { - return delegate.detailedErrorsEnabled(); - } - - @Override - public void sendResponse(RestResponse response) { - close(); - delegate.sendResponse(response); - } - - private void close() { - // attempt to close once atomically - if (closed.compareAndSet(false, true) == false) { - throw new IllegalStateException("Channel is already closed"); - } - inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength); - } - - } - - private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) { - // We always obtain a fresh breaker to reflect changes to the breaker configuration. - return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); - } -} diff --git a/core/src/main/java/org/elasticsearch/http/HttpServerAdapter.java b/core/src/main/java/org/elasticsearch/http/HttpServerAdapter.java deleted file mode 100644 index a7e61143893..00000000000 --- a/core/src/main/java/org/elasticsearch/http/HttpServerAdapter.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.http; - -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.rest.RestChannel; -import org.elasticsearch.rest.RestRequest; - -public interface HttpServerAdapter { - - void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext context); - -} diff --git a/core/src/main/java/org/elasticsearch/http/HttpServerTransport.java b/core/src/main/java/org/elasticsearch/http/HttpServerTransport.java index 4dc4a888d8a..89c04198e7f 100644 --- a/core/src/main/java/org/elasticsearch/http/HttpServerTransport.java +++ b/core/src/main/java/org/elasticsearch/http/HttpServerTransport.java @@ -21,6 +21,9 @@ package org.elasticsearch.http; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestRequest; public interface HttpServerTransport extends LifecycleComponent { @@ -33,6 +36,15 @@ public interface HttpServerTransport extends LifecycleComponent { HttpStats stats(); - void httpServerAdapter(HttpServerAdapter httpServerAdapter); - + @FunctionalInterface + interface Dispatcher { + /** + * Dispatches the {@link RestRequest} to the relevant request handler or responds to the given rest channel directly if + * the request can't be handled by any request handler. + * @param request the request to dispatch + * @param channel the response channel of this request + * @param threadContext the nodes thread context + */ + void dispatch(RestRequest request, RestChannel channel, ThreadContext threadContext); + } } diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java index 3b77466a916..944296d6813 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java @@ -21,10 +21,6 @@ package org.elasticsearch.index.translog; import org.elasticsearch.cli.MultiCommand; import org.elasticsearch.cli.Terminal; -import org.elasticsearch.common.logging.LogConfigurator; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.env.Environment; -import org.elasticsearch.node.internal.InternalSettingsPreparer; /** * Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool diff --git a/core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java b/core/src/main/java/org/elasticsearch/node/InternalSettingsPreparer.java similarity index 96% rename from core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java rename to core/src/main/java/org/elasticsearch/node/InternalSettingsPreparer.java index 840378ffd08..1b3ffe4327b 100644 --- a/core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java +++ b/core/src/main/java/org/elasticsearch/node/InternalSettingsPreparer.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.node.internal; +package org.elasticsearch.node; import java.io.IOException; import java.nio.file.Files; @@ -106,7 +106,8 @@ public class InternalSettingsPreparer { } } if (foundSuffixes.size() > 1) { - throw new SettingsException("multiple settings files found with suffixes: " + Strings.collectionToDelimitedString(foundSuffixes, ",")); + throw new SettingsException("multiple settings files found with suffixes: " + + Strings.collectionToDelimitedString(foundSuffixes, ",")); } // re-initialize settings now that the config file has been loaded @@ -195,7 +196,9 @@ public class InternalSettingsPreparer { private static String promptForValue(String key, Terminal terminal, boolean secret) { if (terminal == null) { - throw new UnsupportedOperationException("found property [" + key + "] with value [" + (secret ? SECRET_PROMPT_VALUE : TEXT_PROMPT_VALUE) +"]. prompting for property values is only supported when running elasticsearch in the foreground"); + throw new UnsupportedOperationException("found property [" + key + "] with value [" + + (secret ? SECRET_PROMPT_VALUE : TEXT_PROMPT_VALUE) + + "]. prompting for property values is only supported when running elasticsearch in the foreground"); } if (secret) { diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 3fcda6c0a3b..97ab20c7767 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -84,7 +84,6 @@ import org.elasticsearch.gateway.GatewayAllocator; import org.elasticsearch.gateway.GatewayModule; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.MetaStateService; -import org.elasticsearch.http.HttpServer; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.indices.IndicesModule; @@ -101,8 +100,6 @@ import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.monitor.MonitorService; import org.elasticsearch.monitor.jvm.JvmInfo; -import org.elasticsearch.node.internal.InternalSettingsPreparer; -import org.elasticsearch.node.service.NodeService; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.AnalysisPlugin; import org.elasticsearch.plugins.ClusterPlugin; @@ -117,6 +114,7 @@ import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.ScriptPlugin; import org.elasticsearch.plugins.SearchPlugin; import org.elasticsearch.repositories.RepositoriesModule; +import org.elasticsearch.rest.RestController; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchModule; @@ -342,14 +340,18 @@ public class Node implements Closeable { modules.add(clusterModule); IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class)); modules.add(indicesModule); + SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class)); - ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), - settingsModule.getClusterSettings(), threadPool, pluginsService.filterPlugins(ActionPlugin.class)); - modules.add(actionModule); - modules.add(new GatewayModule()); CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(), settingsModule.getClusterSettings()); resourcesToClose.add(circuitBreakerService); + ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), + settingsModule.getClusterSettings(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, + circuitBreakerService); + modules.add(actionModule); + modules.add(new GatewayModule()); + + BigArrays bigArrays = createBigArrays(settings, circuitBreakerService); resourcesToClose.add(bigArrays); modules.add(settingsModule); @@ -388,30 +390,35 @@ public class Node implements Closeable { pluginsService.filterPlugins(Plugin.class).stream() .map(Plugin::getCustomMetaDataUpgrader) .collect(Collectors.toList()); + final RestController restController = actionModule.getRestController(); final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class), - threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService); + threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, + restController::dispatchRequest); final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders); final Transport transport = networkModule.getTransportSupplier().get(); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings()); final Consumer httpBind; + final HttpServerTransport httpServerTransport; if (networkModule.isHttpEnabled()) { - HttpServerTransport httpServerTransport = networkModule.getHttpServerTransportSupplier().get(); - HttpServer httpServer = new HttpServer(settings, httpServerTransport, actionModule.getRestController(), client, - circuitBreakerService); + httpServerTransport = networkModule.getHttpServerTransportSupplier().get(); httpBind = b -> { - b.bind(HttpServer.class).toInstance(httpServer); b.bind(HttpServerTransport.class).toInstance(httpServerTransport); }; } else { httpBind = b -> { - b.bind(HttpServer.class).toProvider(Providers.of(null)); + b.bind(HttpServerTransport.class).toProvider(Providers.of(null)); }; + httpServerTransport = null; } - final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService, namedWriteableRegistry, networkService, clusterService, pluginsService.filterPlugins(DiscoveryPlugin.class)); + NodeService nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(), + transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), + httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter()); + modules.add(b -> { + b.bind(NodeService.class).toInstance(nodeService); b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry); b.bind(PluginsService.class).toInstance(pluginsService); b.bind(Client.class).toInstance(client); @@ -628,7 +635,7 @@ public class Node implements Closeable { } if (NetworkModule.HTTP_ENABLED.get(settings)) { - injector.getInstance(HttpServer.class).start(); + injector.getInstance(HttpServerTransport.class).start(); } // start nodes now, after the http server, because it may take some time @@ -658,7 +665,7 @@ public class Node implements Closeable { injector.getInstance(TribeService.class).stop(); injector.getInstance(ResourceWatcherService.class).stop(); if (NetworkModule.HTTP_ENABLED.get(settings)) { - injector.getInstance(HttpServer.class).stop(); + injector.getInstance(HttpServerTransport.class).stop(); } injector.getInstance(SnapshotsService.class).stop(); @@ -708,7 +715,7 @@ public class Node implements Closeable { toClose.add(injector.getInstance(NodeService.class)); toClose.add(() -> stopWatch.stop().start("http")); if (NetworkModule.HTTP_ENABLED.get(settings)) { - toClose.add(injector.getInstance(HttpServer.class)); + toClose.add(injector.getInstance(HttpServerTransport.class)); } toClose.add(() -> stopWatch.stop().start("snapshot_service")); toClose.add(injector.getInstance(SnapshotsService.class)); diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java index 6a8f8b90681..929e889503e 100644 --- a/core/src/main/java/org/elasticsearch/node/NodeModule.java +++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java @@ -22,7 +22,6 @@ package org.elasticsearch.node; import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.monitor.MonitorService; -import org.elasticsearch.node.service.NodeService; public class NodeModule extends AbstractModule { @@ -38,7 +37,6 @@ public class NodeModule extends AbstractModule { protected void configure() { bind(Node.class).toInstance(node); bind(MonitorService.class).toInstance(monitorService); - bind(NodeService.class).asEagerSingleton(); bind(DiskThresholdMonitor.class).asEagerSingleton(); } } diff --git a/core/src/main/java/org/elasticsearch/node/service/NodeService.java b/core/src/main/java/org/elasticsearch/node/NodeService.java similarity index 88% rename from core/src/main/java/org/elasticsearch/node/service/NodeService.java rename to core/src/main/java/org/elasticsearch/node/NodeService.java index 7d9a148b271..cb245487152 100644 --- a/core/src/main/java/org/elasticsearch/node/service/NodeService.java +++ b/core/src/main/java/org/elasticsearch/node/NodeService.java @@ -17,7 +17,7 @@ * under the License. */ -package org.elasticsearch.node.service; +package org.elasticsearch.node; import org.elasticsearch.Build; import org.elasticsearch.Version; @@ -27,11 +27,10 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.http.HttpServer; +import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.ingest.IngestService; @@ -55,17 +54,16 @@ public class NodeService extends AbstractComponent implements Closeable { private final IngestService ingestService; private final SettingsFilter settingsFilter; private ScriptService scriptService; + private final HttpServerTransport httpServerTransport; - @Nullable - private final HttpServer httpServer; private final Discovery discovery; - @Inject - public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, + NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery, TransportService transportService, IndicesService indicesService, PluginsService pluginService, - CircuitBreakerService circuitBreakerService, ScriptService scriptService, @Nullable HttpServer httpServer, - IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter) { + CircuitBreakerService circuitBreakerService, ScriptService scriptService, + @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService, + SettingsFilter settingsFilter) { super(settings); this.threadPool = threadPool; this.monitorService = monitorService; @@ -74,7 +72,7 @@ public class NodeService extends AbstractComponent implements Closeable { this.discovery = discovery; this.pluginService = pluginService; this.circuitBreakerService = circuitBreakerService; - this.httpServer = httpServer; + this.httpServerTransport = httpServerTransport; this.ingestService = ingestService; this.settingsFilter = settingsFilter; this.scriptService = scriptService; @@ -91,7 +89,7 @@ public class NodeService extends AbstractComponent implements Closeable { jvm ? monitorService.jvmService().info() : null, threadPool ? this.threadPool.info() : null, transport ? transportService.info() : null, - http ? (httpServer == null ? null : httpServer.info()) : null, + http ? (httpServerTransport == null ? null : httpServerTransport.info()) : null, plugin ? (pluginService == null ? null : pluginService.info()) : null, ingest ? (ingestService == null ? null : ingestService.info()) : null, indices ? indicesService.getTotalIndexingBufferBytes() : null @@ -111,7 +109,7 @@ public class NodeService extends AbstractComponent implements Closeable { threadPool ? this.threadPool.stats() : null, fs ? monitorService.fsService().stats() : null, transport ? transportService.stats() : null, - http ? (httpServer == null ? null : httpServer.stats()) : null, + http ? (httpServerTransport == null ? null : httpServerTransport.stats()) : null, circuitBreaker ? circuitBreakerService.stats() : null, script ? scriptService.stats() : null, discoveryStats ? discovery.stats() : null, diff --git a/core/src/main/java/org/elasticsearch/plugins/InstallPluginCommand.java b/core/src/main/java/org/elasticsearch/plugins/InstallPluginCommand.java index dea6fcba312..b502b2a4016 100644 --- a/core/src/main/java/org/elasticsearch/plugins/InstallPluginCommand.java +++ b/core/src/main/java/org/elasticsearch/plugins/InstallPluginCommand.java @@ -33,9 +33,7 @@ import org.elasticsearch.cli.UserException; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.hash.MessageDigests; import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.elasticsearch.node.internal.InternalSettingsPreparer; import java.io.BufferedReader; import java.io.IOException; @@ -63,7 +61,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.TreeSet; diff --git a/core/src/main/java/org/elasticsearch/plugins/ListPluginsCommand.java b/core/src/main/java/org/elasticsearch/plugins/ListPluginsCommand.java index 3f21c44a8f4..a674e7c6e24 100644 --- a/core/src/main/java/org/elasticsearch/plugins/ListPluginsCommand.java +++ b/core/src/main/java/org/elasticsearch/plugins/ListPluginsCommand.java @@ -22,9 +22,7 @@ package org.elasticsearch.plugins; import joptsimple.OptionSet; import org.elasticsearch.cli.EnvironmentAwareCommand; import org.elasticsearch.cli.Terminal; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; -import org.elasticsearch.node.internal.InternalSettingsPreparer; import java.io.IOException; import java.nio.file.DirectoryStream; @@ -33,7 +31,6 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; /** * A command for the plugin cli to list plugins installed in elasticsearch. diff --git a/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java b/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java index ceb7e077e11..33fab61c24a 100644 --- a/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java +++ b/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java @@ -69,8 +69,11 @@ public interface NetworkPlugin { * See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation. */ default Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NamedXContentRegistry xContentRegistry, NetworkService networkService) { + CircuitBreakerService circuitBreakerService, + NamedWriteableRegistry namedWriteableRegistry, + NamedXContentRegistry xContentRegistry, + NetworkService networkService, + HttpServerTransport.Dispatcher dispatcher) { return Collections.emptyMap(); } } diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java index c701e8ff0ee..5ac82b7e454 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestController.java +++ b/core/src/main/java/org/elasticsearch/rest/RestController.java @@ -19,23 +19,35 @@ package org.elasticsearch.rest; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.UnaryOperator; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.path.PathTrie; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import static org.elasticsearch.rest.RestStatus.BAD_REQUEST; +import static org.elasticsearch.rest.RestStatus.FORBIDDEN; +import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; import static org.elasticsearch.rest.RestStatus.OK; public class RestController extends AbstractComponent { @@ -48,18 +60,27 @@ public class RestController extends AbstractComponent { private final UnaryOperator handlerWrapper; + private final NodeClient client; + + private final CircuitBreakerService circuitBreakerService; + /** Rest headers that are copied to internal requests made during a rest request. */ private final Set headersToCopy; - public RestController(Settings settings, Set headersToCopy, UnaryOperator handlerWrapper) { + public RestController(Settings settings, Set headersToCopy, UnaryOperator handlerWrapper, + NodeClient client, CircuitBreakerService circuitBreakerService) { super(settings); this.headersToCopy = headersToCopy; if (handlerWrapper == null) { handlerWrapper = h -> h; // passthrough if no wrapper set } this.handlerWrapper = handlerWrapper; + this.client = client; + this.circuitBreakerService = circuitBreakerService; } + + /** * Registers a REST handler to be executed when the provided {@code method} and {@code path} match the request. * @@ -137,7 +158,34 @@ public class RestController extends AbstractComponent { return (handler != null) ? handler.canTripCircuitBreaker() : true; } - public void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception { + public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) { + if (request.rawPath().equals("/favicon.ico")) { + handleFavicon(request, channel); + return; + } + RestChannel responseChannel = channel; + try { + int contentLength = request.content().length(); + if (canTripCircuitBreaker(request)) { + inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, ""); + } else { + inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength); + } + // iff we could reserve bytes for the request we need to send the response also over this channel + responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength); + dispatchRequest(request, responseChannel, client, threadContext); + } catch (Exception e) { + try { + responseChannel.sendResponse(new BytesRestResponse(channel, e)); + } catch (Exception inner) { + inner.addSuppressed(e); + logger.error((Supplier) () -> + new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner); + } + } + } + + void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception { if (!checkRequestParameters(request, channel)) { return; } @@ -223,4 +271,84 @@ public class RestController extends AbstractComponent { // my_index/my_type/http%3A%2F%2Fwww.google.com return request.rawPath(); } + + void handleFavicon(RestRequest request, RestChannel channel) { + if (request.method() == RestRequest.Method.GET) { + try { + try (InputStream stream = getClass().getResourceAsStream("/config/favicon.ico")) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + Streams.copy(stream, out); + BytesRestResponse restResponse = new BytesRestResponse(RestStatus.OK, "image/x-icon", out.toByteArray()); + channel.sendResponse(restResponse); + } + } catch (IOException e) { + channel.sendResponse(new BytesRestResponse(INTERNAL_SERVER_ERROR, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)); + } + } else { + channel.sendResponse(new BytesRestResponse(FORBIDDEN, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)); + } + } + + private static final class ResourceHandlingHttpChannel implements RestChannel { + private final RestChannel delegate; + private final CircuitBreakerService circuitBreakerService; + private final int contentLength; + private final AtomicBoolean closed = new AtomicBoolean(); + + public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) { + this.delegate = delegate; + this.circuitBreakerService = circuitBreakerService; + this.contentLength = contentLength; + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return delegate.newBuilder(); + } + + @Override + public XContentBuilder newErrorBuilder() throws IOException { + return delegate.newErrorBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { + return delegate.newBuilder(autoDetectSource, useFiltering); + } + + @Override + public BytesStreamOutput bytesOutput() { + return delegate.bytesOutput(); + } + + @Override + public RestRequest request() { + return delegate.request(); + } + + @Override + public boolean detailedErrorsEnabled() { + return delegate.detailedErrorsEnabled(); + } + + @Override + public void sendResponse(RestResponse response) { + close(); + delegate.sendResponse(response); + } + + private void close() { + // attempt to close once atomically + if (closed.compareAndSet(false, true) == false) { + throw new IllegalStateException("Channel is already closed"); + } + inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength); + } + + } + + private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) { + // We always obtain a fresh breaker to reflect changes to the breaker configuration. + return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + } } diff --git a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java index 11799c99cb1..3476c99d484 100644 --- a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java +++ b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.http.HttpInfo; -import org.elasticsearch.http.HttpServerAdapter; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpStats; import org.elasticsearch.indices.breaker.CircuitBreakerService; @@ -89,8 +88,6 @@ public class NetworkModuleTests extends ModuleTestCase { public HttpStats stats() { return null; } - @Override - public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {} } @@ -155,7 +152,8 @@ public class NetworkModuleTests extends ModuleTestCase { CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, - NetworkService networkService) { + NetworkService networkService, + HttpServerTransport.Dispatcher requestDispatcher) { return Collections.singletonMap("custom", custom); } }); @@ -195,7 +193,8 @@ public class NetworkModuleTests extends ModuleTestCase { CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, - NetworkService networkService) { + NetworkService networkService, + HttpServerTransport.Dispatcher requestDispatcher) { Map> supplierMap = new HashMap<>(); supplierMap.put("custom", custom); supplierMap.put("default_custom", def); @@ -228,7 +227,8 @@ public class NetworkModuleTests extends ModuleTestCase { CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, - NetworkService networkService) { + NetworkService networkService, + HttpServerTransport.Dispatcher requestDispatcher) { Map> supplierMap = new HashMap<>(); supplierMap.put("custom", custom); supplierMap.put("default_custom", def); @@ -276,6 +276,7 @@ public class NetworkModuleTests extends ModuleTestCase { } private NetworkModule newNetworkModule(Settings settings, boolean transportClient, NetworkPlugin... plugins) { - return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null); + return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null, + (a, b, c) -> {}); } } diff --git a/core/src/test/java/org/elasticsearch/http/HttpServerTests.java b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java deleted file mode 100644 index db9eebfe5fc..00000000000 --- a/core/src/test/java/org/elasticsearch/http/HttpServerTests.java +++ /dev/null @@ -1,231 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.elasticsearch.http; - -import java.util.Collections; -import java.util.Map; - -import org.elasticsearch.common.breaker.CircuitBreaker; -import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.component.AbstractLifecycleComponent; -import org.elasticsearch.common.settings.ClusterSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.BoundTransportAddress; -import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.indices.breaker.CircuitBreakerService; -import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; -import org.elasticsearch.rest.AbstractRestChannel; -import org.elasticsearch.rest.BytesRestResponse; -import org.elasticsearch.rest.RestController; -import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.RestResponse; -import org.elasticsearch.rest.RestStatus; -import org.elasticsearch.test.ESTestCase; -import org.junit.Before; - -public class HttpServerTests extends ESTestCase { - private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20); - private HttpServer httpServer; - private CircuitBreaker inFlightRequestsBreaker; - - @Before - public void setup() { - Settings settings = Settings.EMPTY; - CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService( - Settings.builder() - .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT) - .build(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); - // we can do this here only because we know that we don't adjust breaker settings dynamically in the test - inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); - - HttpServerTransport httpServerTransport = new TestHttpServerTransport(); - RestController restController = new RestController(settings, Collections.emptySet(), null); - restController.registerHandler(RestRequest.Method.GET, "/", - (request, channel, client) -> channel.sendResponse( - new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY))); - restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel, client) -> { - throw new IllegalArgumentException("test error"); - }); - - httpServer = new HttpServer(settings, httpServerTransport, restController, null, circuitBreakerService); - httpServer.start(); - } - - public void testDispatchRequestAddsAndFreesBytesOnSuccess() { - int contentLength = BREAKER_LIMIT.bytesAsInt(); - String content = randomAsciiOfLength(contentLength); - TestRestRequest request = new TestRestRequest("/", content); - AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK); - - httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); - - assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); - assertEquals(0, inFlightRequestsBreaker.getUsed()); - } - - public void testDispatchRequestAddsAndFreesBytesOnError() { - int contentLength = BREAKER_LIMIT.bytesAsInt(); - String content = randomAsciiOfLength(contentLength); - TestRestRequest request = new TestRestRequest("/error", content); - AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST); - - httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); - - assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); - assertEquals(0, inFlightRequestsBreaker.getUsed()); - } - - public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() { - int contentLength = BREAKER_LIMIT.bytesAsInt(); - String content = randomAsciiOfLength(contentLength); - // we will produce an error in the rest handler and one more when sending the error response - TestRestRequest request = new TestRestRequest("/error", content); - ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true); - - httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); - - assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); - assertEquals(0, inFlightRequestsBreaker.getUsed()); - } - - public void testDispatchRequestLimitsBytes() { - int contentLength = BREAKER_LIMIT.bytesAsInt() + 1; - String content = randomAsciiOfLength(contentLength); - TestRestRequest request = new TestRestRequest("/", content); - AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE); - - httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); - - assertEquals(1, inFlightRequestsBreaker.getTrippedCount()); - assertEquals(0, inFlightRequestsBreaker.getUsed()); - } - - private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements - HttpServerTransport { - - public TestHttpServerTransport() { - super(Settings.EMPTY); - } - - @Override - protected void doStart() { - } - - @Override - protected void doStop() { - } - - @Override - protected void doClose() { - } - - @Override - public BoundTransportAddress boundAddress() { - TransportAddress transportAddress = buildNewFakeTransportAddress(); - return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress); - } - - @Override - public HttpInfo info() { - return null; - } - - @Override - public HttpStats stats() { - return null; - } - - @Override - public void httpServerAdapter(HttpServerAdapter httpServerAdapter) { - - } - } - - private static final class AssertingChannel extends AbstractRestChannel { - private final RestStatus expectedStatus; - - protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) { - super(request, detailedErrorsEnabled); - this.expectedStatus = expectedStatus; - } - - @Override - public void sendResponse(RestResponse response) { - assertEquals(expectedStatus, response.status()); - } - } - - private static final class ExceptionThrowingChannel extends AbstractRestChannel { - - protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) { - super(request, detailedErrorsEnabled); - } - - @Override - public void sendResponse(RestResponse response) { - throw new IllegalStateException("always throwing an exception for testing"); - } - } - - private static final class TestRestRequest extends RestRequest { - - private final BytesReference content; - - private TestRestRequest(String path, String content) { - super(NamedXContentRegistry.EMPTY, Collections.emptyMap(), path); - this.content = new BytesArray(content); - } - - @Override - public Method method() { - return Method.GET; - } - - @Override - public String uri() { - return null; - } - - @Override - public boolean hasContent() { - return true; - } - - @Override - public BytesReference content() { - return content; - } - - @Override - public String header(String name) { - return null; - } - - @Override - public Iterable> headers() { - return null; - } - - } -} diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java index 02fe0d03c77..83072636b76 100644 --- a/core/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java +++ b/core/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java @@ -22,7 +22,7 @@ package org.elasticsearch.ingest; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.action.ingest.WritePipelineResponse; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.NodeService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; diff --git a/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java b/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java index 2dc95f8e9f1..94b3f3737cb 100644 --- a/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java +++ b/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsException; import org.elasticsearch.env.Environment; +import org.elasticsearch.node.InternalSettingsPreparer; import org.elasticsearch.test.ESTestCase; import org.junit.After; import org.junit.Before; diff --git a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java index cce5c463759..e7064554908 100644 --- a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -29,11 +29,26 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.UnaryOperator; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.logging.DeprecationLogger; +import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.http.HttpInfo; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.http.HttpStats; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.rest.FakeRestRequest; +import org.junit.Before; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -43,10 +58,39 @@ import static org.mockito.Mockito.verify; public class RestControllerTests extends ESTestCase { + private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20); + private CircuitBreaker inFlightRequestsBreaker; + private RestController restController; + private HierarchyCircuitBreakerService circuitBreakerService; + + @Before + public void setup() { + Settings settings = Settings.EMPTY; + circuitBreakerService = new HierarchyCircuitBreakerService( + Settings.builder() + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT) + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + // we can do this here only because we know that we don't adjust breaker settings dynamically in the test + inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + + HttpServerTransport httpServerTransport = new TestHttpServerTransport(); + restController = new RestController(settings, Collections.emptySet(), null, null, circuitBreakerService); + restController.registerHandler(RestRequest.Method.GET, "/", + (request, channel, client) -> channel.sendResponse( + new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY))); + restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel, client) -> { + throw new IllegalArgumentException("test error"); + }); + + httpServerTransport.start(); + } + + public void testApplyRelevantHeaders() throws Exception { final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); Set headers = new HashSet<>(Arrays.asList("header.1", "header.2")); - final RestController restController = new RestController(Settings.EMPTY, headers, null); + final RestController restController = new RestController(Settings.EMPTY, headers, null, null, circuitBreakerService); restController.registerHandler(RestRequest.Method.GET, "/", (RestRequest request, RestChannel channel, NodeClient client) -> { assertEquals("true", threadContext.getHeader("header.1")); @@ -66,7 +110,7 @@ public class RestControllerTests extends ESTestCase { } public void testCanTripCircuitBreaker() throws Exception { - RestController controller = new RestController(Settings.EMPTY, Collections.emptySet(), null); + RestController controller = new RestController(Settings.EMPTY, Collections.emptySet(), null, null, circuitBreakerService); // trip circuit breaker by default controller.registerHandler(RestRequest.Method.GET, "/trip", new FakeRestHandler(true)); controller.registerHandler(RestRequest.Method.GET, "/do-not-trip", new FakeRestHandler(false)); @@ -126,7 +170,8 @@ public class RestControllerTests extends ESTestCase { assertSame(handler, h); return (RestRequest request, RestChannel channel, NodeClient client) -> wrapperCalled.set(true); }; - final RestController restController = new RestController(Settings.EMPTY, Collections.emptySet(), wrapper); + final RestController restController = new RestController(Settings.EMPTY, Collections.emptySet(), wrapper, null, + circuitBreakerService); restController.registerHandler(RestRequest.Method.GET, "/", handler); final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); restController.dispatchRequest(new FakeRestRequest.Builder(xContentRegistry()).build(), null, null, threadContext); @@ -154,4 +199,156 @@ public class RestControllerTests extends ESTestCase { return canTripCircuitBreaker; } } + + public void testDispatchRequestAddsAndFreesBytesOnSuccess() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK); + + restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestAddsAndFreesBytesOnError() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/error", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST); + + restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + // we will produce an error in the rest handler and one more when sending the error response + TestRestRequest request = new TestRestRequest("/error", content); + ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true); + + restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestLimitsBytes() { + int contentLength = BREAKER_LIMIT.bytesAsInt() + 1; + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE); + + restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(1, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements + HttpServerTransport { + + public TestHttpServerTransport() { + super(Settings.EMPTY); + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() { + } + + @Override + public BoundTransportAddress boundAddress() { + TransportAddress transportAddress = buildNewFakeTransportAddress(); + return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress); + } + + @Override + public HttpInfo info() { + return null; + } + + @Override + public HttpStats stats() { + return null; + } + } + + private static final class AssertingChannel extends AbstractRestChannel { + private final RestStatus expectedStatus; + + protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) { + super(request, detailedErrorsEnabled); + this.expectedStatus = expectedStatus; + } + + @Override + public void sendResponse(RestResponse response) { + assertEquals(expectedStatus, response.status()); + } + } + + private static final class ExceptionThrowingChannel extends AbstractRestChannel { + + protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) { + super(request, detailedErrorsEnabled); + } + + @Override + public void sendResponse(RestResponse response) { + throw new IllegalStateException("always throwing an exception for testing"); + } + } + + private static final class TestRestRequest extends RestRequest { + + private final BytesReference content; + + private TestRestRequest(String path, String content) { + super(NamedXContentRegistry.EMPTY, Collections.emptyMap(), path); + this.content = new BytesArray(content); + } + + @Override + public Method method() { + return Method.GET; + } + + @Override + public String uri() { + return null; + } + + @Override + public boolean hasContent() { + return true; + } + + @Override + public BytesReference content() { + return content; + } + + @Override + public String header(String name) { + return null; + } + + @Override + public Iterable> headers() { + return null; + } + + } } diff --git a/core/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java index 9de530d417d..ba478331ca0 100644 --- a/core/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java +++ b/core/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.admin.cluster; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.test.ESTestCase; @@ -43,7 +44,7 @@ public class RestNodesStatsActionTests extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - action = new RestNodesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null)); + action = new RestNodesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null, null, null)); } public void testUnrecognizedMetric() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsActionTests.java index feac1672c11..0aa6e497836 100644 --- a/core/src/test/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsActionTests.java +++ b/core/src/test/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsActionTests.java @@ -41,7 +41,7 @@ public class RestIndicesStatsActionTests extends ESTestCase { @Override public void setUp() throws Exception { super.setUp(); - action = new RestIndicesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null)); + action = new RestIndicesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null, null, null)); } public void testUnrecognizedMetric() throws IOException { diff --git a/core/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java index 2a22541bbc7..ae66664b456 100644 --- a/core/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java +++ b/core/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java @@ -74,7 +74,7 @@ public class RestIndicesActionTests extends ESTestCase { public void testBuildTable() { final Settings settings = Settings.EMPTY; - final RestController restController = new RestController(settings, Collections.emptySet(), null); + final RestController restController = new RestController(settings, Collections.emptySet(), null, null, null); final RestIndicesAction action = new RestIndicesAction(settings, restController, new IndexNameExpressionResolver(settings)); // build a (semi-)random table diff --git a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java index fa93e4a80d3..88623687bf1 100644 --- a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java +++ b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java @@ -50,7 +50,7 @@ public class RestRecoveryActionTests extends ESTestCase { public void testRestRecoveryAction() { final Settings settings = Settings.EMPTY; - final RestController restController = new RestController(settings, Collections.emptySet(), null); + final RestController restController = new RestController(settings, Collections.emptySet(), null, null, null); final RestRecoveryAction action = new RestRecoveryAction(settings, restController, restController); final int totalShards = randomIntBetween(1, 32); final int successfulShards = Math.max(0, totalShards - randomIntBetween(1, 2)); diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java index c6c2899e4b3..138ed0a67be 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java @@ -63,7 +63,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.http.BindHttpException; import org.elasticsearch.http.HttpInfo; -import org.elasticsearch.http.HttpServerAdapter; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.http.HttpStats; import org.elasticsearch.http.netty4.cors.Netty4CorsConfig; @@ -210,6 +209,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem protected final ByteSizeValue maxCumulationBufferCapacity; protected final int maxCompositeBufferComponents; + private final Dispatcher dispatcher; protected volatile ServerBootstrap serverBootstrap; @@ -220,17 +220,17 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem // package private for testing Netty4OpenChannelsHandler serverOpenChannels; - protected volatile HttpServerAdapter httpServerAdapter; private final Netty4CorsConfig corsConfig; public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool, - NamedXContentRegistry xContentRegistry) { + NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) { super(settings); this.networkService = networkService; this.bigArrays = bigArrays; this.threadPool = threadPool; this.xContentRegistry = xContentRegistry; + this.dispatcher = dispatcher; ByteSizeValue maxContentLength = SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings); this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings); @@ -286,11 +286,6 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem return this.settings; } - @Override - public void httpServerAdapter(HttpServerAdapter httpServerAdapter) { - this.httpServerAdapter = httpServerAdapter; - } - @Override protected void doStart() { boolean success = false; @@ -331,6 +326,9 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress); this.boundAddress = createBoundHttpAddress(); + if (logger.isInfoEnabled()) { + logger.info("{}", boundAddress); + } success = true; } finally { if (success == false) { @@ -511,7 +509,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem } protected void dispatchRequest(RestRequest request, RestChannel channel) { - httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext()); + dispatcher.dispatch(request, channel, threadPool.getThreadContext()); } protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java index 6a435c19efa..0516a449629 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java @@ -93,9 +93,12 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin { @Override public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays, - CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry, - NamedXContentRegistry xContentRegistry, NetworkService networkService) { + CircuitBreakerService circuitBreakerService, + NamedWriteableRegistry namedWriteableRegistry, + NamedXContentRegistry xContentRegistry, + NetworkService networkService, + HttpServerTransport.Dispatcher dispatcher) { return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME, - () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry)); + () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher)); } } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java index 457b2242af4..c7427e717b3 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java @@ -188,7 +188,8 @@ public class Netty4HttpChannelTests extends ESTestCase { public void testHeadersSet() { Settings settings = Settings.builder().build(); try (Netty4HttpServerTransport httpServerTransport = - new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry())) { + new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), + (request, channel, context) -> {})) { httpServerTransport.start(); final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); httpRequest.headers().add(HttpHeaderNames.ORIGIN, "remote"); @@ -218,7 +219,8 @@ public class Netty4HttpChannelTests extends ESTestCase { public void testConnectionClose() throws Exception { final Settings settings = Settings.builder().build(); try (Netty4HttpServerTransport httpServerTransport = - new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry())) { + new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), + (request, channel, context) -> {})) { httpServerTransport.start(); final FullHttpRequest httpRequest; final boolean close = randomBoolean(); @@ -253,7 +255,8 @@ public class Netty4HttpChannelTests extends ESTestCase { private FullHttpResponse executeRequest(final Settings settings, final String originValue, final String host) { // construct request and send it over the transport layer try (Netty4HttpServerTransport httpServerTransport = - new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry())) { + new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(), + (request, channel, context) -> {})) { httpServerTransport.start(); final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); if (originValue != null) { diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java index 5c7a249f74a..c0f8746d514 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java @@ -160,7 +160,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase { Netty4HttpServerPipeliningTests.this.networkService, Netty4HttpServerPipeliningTests.this.bigArrays, Netty4HttpServerPipeliningTests.this.threadPool, - xContentRegistry()); + xContentRegistry(), (request, channel, context) -> {}); } @Override diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java index 7481ba4c3a3..e3dd6d8a78e 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java @@ -121,9 +121,8 @@ public class Netty4HttpServerTransportTests extends ESTestCase { */ public void testExpectContinueHeader() throws Exception { try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, - xContentRegistry())) { - transport.httpServerAdapter((request, channel, context) -> - channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done")))); + xContentRegistry(), (request, channel, context) -> + channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))))) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); @@ -145,12 +144,12 @@ public class Netty4HttpServerTransportTests extends ESTestCase { public void testBindUnavailableAddress() { try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, - xContentRegistry())) { + xContentRegistry(), (request, channel, context) -> {})) { transport.start(); TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses()); Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build(); try (Netty4HttpServerTransport otherTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, - xContentRegistry())) { + xContentRegistry(), (request, channel, context) -> {})) { BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start()); assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage()); } diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 6a2e6bacae4..8774ba5836b 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -29,18 +29,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; -import org.elasticsearch.discovery.zen.UnicastHostsProvider; -import org.elasticsearch.discovery.zen.ZenPing; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.node.internal.InternalSettingsPreparer; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; -import org.elasticsearch.test.discovery.MockZenPing; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractQueryTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractQueryTestCase.java index 01bab59eb27..2a230106994 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/AbstractQueryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractQueryTestCase.java @@ -81,7 +81,7 @@ import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.mapper.MapperRegistry; -import org.elasticsearch.node.internal.InternalSettingsPreparer; +import org.elasticsearch.node.InternalSettingsPreparer; import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.PluginsService; diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 6c575126276..b4aee750a31 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -27,7 +27,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings; import org.apache.logging.log4j.Logger; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; @@ -85,7 +84,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.node.MockNode; import org.elasticsearch.node.Node; import org.elasticsearch.node.NodeValidationException; -import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.node.NodeService; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService;