diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index a3e5af6c4d4..a089d677dc9 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -430,7 +430,6 @@
-
diff --git a/core/src/main/java/org/elasticsearch/action/ActionModule.java b/core/src/main/java/org/elasticsearch/action/ActionModule.java
index a24ed5f8083..0560b11cbb8 100644
--- a/core/src/main/java/org/elasticsearch/action/ActionModule.java
+++ b/core/src/main/java/org/elasticsearch/action/ActionModule.java
@@ -19,7 +19,6 @@
package org.elasticsearch.action;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -196,6 +195,7 @@ import org.elasticsearch.action.termvectors.TransportShardMultiTermsVectorAction
import org.elasticsearch.action.termvectors.TransportTermVectorsAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateAction;
+import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
@@ -205,6 +205,7 @@ import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
import org.elasticsearch.rest.RestController;
@@ -332,7 +333,8 @@ public class ActionModule extends AbstractModule {
private final RestController restController;
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver resolver,
- ClusterSettings clusterSettings, ThreadPool threadPool, List actionPlugins) {
+ ClusterSettings clusterSettings, ThreadPool threadPool, List actionPlugins,
+ NodeClient nodeClient, CircuitBreakerService circuitBreakerService) {
this.transportClient = transportClient;
this.settings = settings;
this.actionPlugins = actionPlugins;
@@ -352,9 +354,14 @@ public class ActionModule extends AbstractModule {
restWrapper = newRestWrapper;
}
}
- restController = new RestController(settings, headers, restWrapper);
+ if (transportClient) {
+ restController = null;
+ } else {
+ restController = new RestController(settings, headers, restWrapper, nodeClient, circuitBreakerService);
+ }
}
+
public Map> getActions() {
return actions;
}
@@ -648,8 +655,10 @@ public class ActionModule extends AbstractModule {
}
}
- // Bind the RestController which is required (by Node) even if rest isn't enabled.
- bind(RestController.class).toInstance(restController);
+ if (restController != null) {
+ // Bind the RestController which is required (by Node) even if rest isn't enabled.
+ bind(RestController.class).toInstance(restController);
+ }
// Setup the RestHandlers
if (NetworkModule.HTTP_ENABLED.get(settings)) {
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java
index c26554b25e0..7d80b84d5d2 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/info/TransportNodesInfoAction.java
@@ -29,7 +29,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
index b4cef38d28d..e4034582f96 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java
@@ -29,7 +29,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
index 45eb83dd9e1..d77bc599258 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java
@@ -39,7 +39,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
-import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
diff --git a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java
index 74ce894b053..45cb83634f8 100644
--- a/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java
+++ b/core/src/main/java/org/elasticsearch/action/ingest/DeletePipelineTransportAction.java
@@ -30,7 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
-import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
diff --git a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java
index 8bac5c7b804..f64b36d47ae 100644
--- a/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java
+++ b/core/src/main/java/org/elasticsearch/action/ingest/GetPipelineTransportAction.java
@@ -30,7 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
-import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
diff --git a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java
index 82cd8d8eb7b..7dde9818049 100644
--- a/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java
+++ b/core/src/main/java/org/elasticsearch/action/ingest/PutPipelineTransportAction.java
@@ -36,7 +36,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestInfo;
-import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
diff --git a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java
index 4f9a219c8ad..61fd400a1d3 100644
--- a/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java
+++ b/core/src/main/java/org/elasticsearch/action/ingest/SimulatePipelineTransportAction.java
@@ -19,7 +19,6 @@
package org.elasticsearch.action.ingest;
-import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@@ -28,7 +27,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.PipelineStore;
-import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
diff --git a/core/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java b/core/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java
index 33f3f922fa4..2b47908c352 100644
--- a/core/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java
+++ b/core/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java
@@ -40,7 +40,6 @@ import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.IfConfig;
import org.elasticsearch.common.settings.KeyStoreWrapper;
-import org.elasticsearch.common.settings.SecureSetting;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
@@ -50,7 +49,7 @@ import org.elasticsearch.monitor.os.OsProbe;
import org.elasticsearch.monitor.process.ProcessProbe;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.node.InternalSettingsPreparer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
diff --git a/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java b/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java
index b19fc4ca957..8372a6b8ab8 100644
--- a/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java
+++ b/core/src/main/java/org/elasticsearch/cli/EnvironmentAwareCommand.java
@@ -24,7 +24,7 @@ import joptsimple.OptionSpec;
import joptsimple.util.KeyValuePair;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.node.InternalSettingsPreparer;
import java.util.HashMap;
import java.util.Locale;
diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java
index 803c0e6d1d1..51bed4a4582 100644
--- a/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java
+++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClient.java
@@ -46,7 +46,7 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.Node;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
@@ -160,7 +160,7 @@ public abstract class TransportClient extends AbstractClient {
}
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getClusterSettings(),
- threadPool, pluginsService.filterPlugins(ActionPlugin.class));
+ threadPool, pluginsService.filterPlugins(ActionPlugin.class), null, null);
modules.add(actionModule);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
@@ -170,7 +170,7 @@ public abstract class TransportClient extends AbstractClient {
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
- bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService);
+ bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = new TransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(),
diff --git a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java
index 04f6b62dde1..81d228da230 100644
--- a/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java
+++ b/core/src/main/java/org/elasticsearch/common/network/NetworkModule.java
@@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry.FromXContent;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
+import org.elasticsearch.rest.RestController;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@@ -109,13 +110,13 @@ public final class NetworkModule {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
- NetworkService networkService) {
+ NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {
this.settings = settings;
this.transportClient = transportClient;
for (NetworkPlugin plugin : plugins) {
if (transportClient == false && HTTP_ENABLED.get(settings)) {
Map> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
- circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService);
+ circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);
for (Map.Entry> entry : httpTransportFactory.entrySet()) {
registerHttpTransport(entry.getKey(), entry.getValue());
}
diff --git a/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java b/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java
index 60276ce14f7..44d18208803 100644
--- a/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java
+++ b/core/src/main/java/org/elasticsearch/common/settings/SettingsModule.java
@@ -54,6 +54,7 @@ public class SettingsModule implements Module {
private final Logger logger;
private final IndexScopedSettings indexScopedSettings;
private final ClusterSettings clusterSettings;
+ private final SettingsFilter settingsFilter;
public SettingsModule(Settings settings, Setting>... additionalSettings) {
this(settings, Arrays.asList(additionalSettings), Collections.emptyList());
@@ -137,12 +138,13 @@ public class SettingsModule implements Module {
final Predicate acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate();
clusterSettings.validate(settings.filter(acceptOnlyClusterSettings));
validateTribeSettings(settings, clusterSettings);
+ this.settingsFilter = new SettingsFilter(settings, settingsFilterPattern);
}
@Override
public void configure(Binder binder) {
binder.bind(Settings.class).toInstance(settings);
- binder.bind(SettingsFilter.class).toInstance(new SettingsFilter(settings, settingsFilterPattern));
+ binder.bind(SettingsFilter.class).toInstance(settingsFilter);
binder.bind(ClusterSettings.class).toInstance(clusterSettings);
binder.bind(IndexScopedSettings.class).toInstance(indexScopedSettings);
}
@@ -218,4 +220,6 @@ public class SettingsModule implements Module {
public ClusterSettings getClusterSettings() {
return clusterSettings;
}
+
+ public SettingsFilter getSettingsFilter() { return settingsFilter; }
}
diff --git a/core/src/main/java/org/elasticsearch/http/HttpServer.java b/core/src/main/java/org/elasticsearch/http/HttpServer.java
deleted file mode 100644
index 06bc392587a..00000000000
--- a/core/src/main/java/org/elasticsearch/http/HttpServer.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.http;
-
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.logging.log4j.util.Supplier;
-import org.elasticsearch.client.node.NodeClient;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.breaker.CircuitBreaker;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.io.Streams;
-import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.indices.breaker.CircuitBreakerService;
-import org.elasticsearch.rest.BytesRestResponse;
-import org.elasticsearch.rest.RestChannel;
-import org.elasticsearch.rest.RestController;
-import org.elasticsearch.rest.RestRequest;
-import org.elasticsearch.rest.RestResponse;
-import org.elasticsearch.rest.RestStatus;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
-import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
-
-/**
- * A component to serve http requests, backed by rest handlers.
- */
-public class HttpServer extends AbstractLifecycleComponent implements HttpServerAdapter {
- private final HttpServerTransport transport;
-
- private final RestController restController;
-
- private final NodeClient client;
-
- private final CircuitBreakerService circuitBreakerService;
-
- public HttpServer(Settings settings, HttpServerTransport transport, RestController restController,
- NodeClient client, CircuitBreakerService circuitBreakerService) {
- super(settings);
- this.transport = transport;
- this.restController = restController;
- this.client = client;
- this.circuitBreakerService = circuitBreakerService;
- transport.httpServerAdapter(this);
- }
-
-
- @Override
- protected void doStart() {
- transport.start();
- if (logger.isInfoEnabled()) {
- logger.info("{}", transport.boundAddress());
- }
- }
-
- @Override
- protected void doStop() {
- transport.stop();
- }
-
- @Override
- protected void doClose() {
- transport.close();
- }
-
- public HttpInfo info() {
- return transport.info();
- }
-
- public HttpStats stats() {
- return transport.stats();
- }
-
- public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
- if (request.rawPath().equals("/favicon.ico")) {
- handleFavicon(request, channel);
- return;
- }
- RestChannel responseChannel = channel;
- try {
- int contentLength = request.content().length();
- if (restController.canTripCircuitBreaker(request)) {
- inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "");
- } else {
- inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
- }
- // iff we could reserve bytes for the request we need to send the response also over this channel
- responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
- restController.dispatchRequest(request, responseChannel, client, threadContext);
- } catch (Exception e) {
- try {
- responseChannel.sendResponse(new BytesRestResponse(channel, e));
- } catch (Exception inner) {
- inner.addSuppressed(e);
- logger.error((Supplier>) () ->
- new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
- }
- }
- }
-
- void handleFavicon(RestRequest request, RestChannel channel) {
- if (request.method() == RestRequest.Method.GET) {
- try {
- try (InputStream stream = getClass().getResourceAsStream("/config/favicon.ico")) {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- Streams.copy(stream, out);
- BytesRestResponse restResponse = new BytesRestResponse(RestStatus.OK, "image/x-icon", out.toByteArray());
- channel.sendResponse(restResponse);
- }
- } catch (IOException e) {
- channel.sendResponse(new BytesRestResponse(INTERNAL_SERVER_ERROR, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
- }
- } else {
- channel.sendResponse(new BytesRestResponse(FORBIDDEN, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
- }
- }
-
- private static final class ResourceHandlingHttpChannel implements RestChannel {
- private final RestChannel delegate;
- private final CircuitBreakerService circuitBreakerService;
- private final int contentLength;
- private final AtomicBoolean closed = new AtomicBoolean();
-
- public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) {
- this.delegate = delegate;
- this.circuitBreakerService = circuitBreakerService;
- this.contentLength = contentLength;
- }
-
- @Override
- public XContentBuilder newBuilder() throws IOException {
- return delegate.newBuilder();
- }
-
- @Override
- public XContentBuilder newErrorBuilder() throws IOException {
- return delegate.newErrorBuilder();
- }
-
- @Override
- public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException {
- return delegate.newBuilder(autoDetectSource, useFiltering);
- }
-
- @Override
- public BytesStreamOutput bytesOutput() {
- return delegate.bytesOutput();
- }
-
- @Override
- public RestRequest request() {
- return delegate.request();
- }
-
- @Override
- public boolean detailedErrorsEnabled() {
- return delegate.detailedErrorsEnabled();
- }
-
- @Override
- public void sendResponse(RestResponse response) {
- close();
- delegate.sendResponse(response);
- }
-
- private void close() {
- // attempt to close once atomically
- if (closed.compareAndSet(false, true) == false) {
- throw new IllegalStateException("Channel is already closed");
- }
- inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
- }
-
- }
-
- private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) {
- // We always obtain a fresh breaker to reflect changes to the breaker configuration.
- return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
- }
-}
diff --git a/core/src/main/java/org/elasticsearch/http/HttpServerAdapter.java b/core/src/main/java/org/elasticsearch/http/HttpServerAdapter.java
deleted file mode 100644
index a7e61143893..00000000000
--- a/core/src/main/java/org/elasticsearch/http/HttpServerAdapter.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.http;
-
-import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.rest.RestChannel;
-import org.elasticsearch.rest.RestRequest;
-
-public interface HttpServerAdapter {
-
- void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext context);
-
-}
diff --git a/core/src/main/java/org/elasticsearch/http/HttpServerTransport.java b/core/src/main/java/org/elasticsearch/http/HttpServerTransport.java
index 4dc4a888d8a..89c04198e7f 100644
--- a/core/src/main/java/org/elasticsearch/http/HttpServerTransport.java
+++ b/core/src/main/java/org/elasticsearch/http/HttpServerTransport.java
@@ -21,6 +21,9 @@ package org.elasticsearch.http;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.transport.BoundTransportAddress;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.rest.RestChannel;
+import org.elasticsearch.rest.RestRequest;
public interface HttpServerTransport extends LifecycleComponent {
@@ -33,6 +36,15 @@ public interface HttpServerTransport extends LifecycleComponent {
HttpStats stats();
- void httpServerAdapter(HttpServerAdapter httpServerAdapter);
-
+ @FunctionalInterface
+ interface Dispatcher {
+ /**
+ * Dispatches the {@link RestRequest} to the relevant request handler or responds to the given rest channel directly if
+ * the request can't be handled by any request handler.
+ * @param request the request to dispatch
+ * @param channel the response channel of this request
+ * @param threadContext the nodes thread context
+ */
+ void dispatch(RestRequest request, RestChannel channel, ThreadContext threadContext);
+ }
}
diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java
index 3b77466a916..944296d6813 100644
--- a/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java
+++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogToolCli.java
@@ -21,10 +21,6 @@ package org.elasticsearch.index.translog;
import org.elasticsearch.cli.MultiCommand;
import org.elasticsearch.cli.Terminal;
-import org.elasticsearch.common.logging.LogConfigurator;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
/**
* Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool
diff --git a/core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java b/core/src/main/java/org/elasticsearch/node/InternalSettingsPreparer.java
similarity index 96%
rename from core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java
rename to core/src/main/java/org/elasticsearch/node/InternalSettingsPreparer.java
index 840378ffd08..1b3ffe4327b 100644
--- a/core/src/main/java/org/elasticsearch/node/internal/InternalSettingsPreparer.java
+++ b/core/src/main/java/org/elasticsearch/node/InternalSettingsPreparer.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.elasticsearch.node.internal;
+package org.elasticsearch.node;
import java.io.IOException;
import java.nio.file.Files;
@@ -106,7 +106,8 @@ public class InternalSettingsPreparer {
}
}
if (foundSuffixes.size() > 1) {
- throw new SettingsException("multiple settings files found with suffixes: " + Strings.collectionToDelimitedString(foundSuffixes, ","));
+ throw new SettingsException("multiple settings files found with suffixes: "
+ + Strings.collectionToDelimitedString(foundSuffixes, ","));
}
// re-initialize settings now that the config file has been loaded
@@ -195,7 +196,9 @@ public class InternalSettingsPreparer {
private static String promptForValue(String key, Terminal terminal, boolean secret) {
if (terminal == null) {
- throw new UnsupportedOperationException("found property [" + key + "] with value [" + (secret ? SECRET_PROMPT_VALUE : TEXT_PROMPT_VALUE) +"]. prompting for property values is only supported when running elasticsearch in the foreground");
+ throw new UnsupportedOperationException("found property [" + key + "] with value ["
+ + (secret ? SECRET_PROMPT_VALUE : TEXT_PROMPT_VALUE)
+ + "]. prompting for property values is only supported when running elasticsearch in the foreground");
}
if (secret) {
diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java
index 3fcda6c0a3b..97ab20c7767 100644
--- a/core/src/main/java/org/elasticsearch/node/Node.java
+++ b/core/src/main/java/org/elasticsearch/node/Node.java
@@ -84,7 +84,6 @@ import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService;
-import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.indices.IndicesModule;
@@ -101,8 +100,6 @@ import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
-import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
@@ -117,6 +114,7 @@ import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
+import org.elasticsearch.rest.RestController;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
@@ -342,14 +340,18 @@ public class Node implements Closeable {
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
+
SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
- ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
- settingsModule.getClusterSettings(), threadPool, pluginsService.filterPlugins(ActionPlugin.class));
- modules.add(actionModule);
- modules.add(new GatewayModule());
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
+ ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
+ settingsModule.getClusterSettings(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client,
+ circuitBreakerService);
+ modules.add(actionModule);
+ modules.add(new GatewayModule());
+
+
BigArrays bigArrays = createBigArrays(settings, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
@@ -388,30 +390,35 @@ public class Node implements Closeable {
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getCustomMetaDataUpgrader)
.collect(Collectors.toList());
+ final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
- threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService);
+ threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService,
+ restController::dispatchRequest);
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
final Consumer httpBind;
+ final HttpServerTransport httpServerTransport;
if (networkModule.isHttpEnabled()) {
- HttpServerTransport httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
- HttpServer httpServer = new HttpServer(settings, httpServerTransport, actionModule.getRestController(), client,
- circuitBreakerService);
+ httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
httpBind = b -> {
- b.bind(HttpServer.class).toInstance(httpServer);
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
};
} else {
httpBind = b -> {
- b.bind(HttpServer.class).toProvider(Providers.of(null));
+ b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
};
+ httpServerTransport = null;
}
-
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService,
namedWriteableRegistry, networkService, clusterService, pluginsService.filterPlugins(DiscoveryPlugin.class));
+ NodeService nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
+ transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
+ httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
+
modules.add(b -> {
+ b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
@@ -628,7 +635,7 @@ public class Node implements Closeable {
}
if (NetworkModule.HTTP_ENABLED.get(settings)) {
- injector.getInstance(HttpServer.class).start();
+ injector.getInstance(HttpServerTransport.class).start();
}
// start nodes now, after the http server, because it may take some time
@@ -658,7 +665,7 @@ public class Node implements Closeable {
injector.getInstance(TribeService.class).stop();
injector.getInstance(ResourceWatcherService.class).stop();
if (NetworkModule.HTTP_ENABLED.get(settings)) {
- injector.getInstance(HttpServer.class).stop();
+ injector.getInstance(HttpServerTransport.class).stop();
}
injector.getInstance(SnapshotsService.class).stop();
@@ -708,7 +715,7 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(NodeService.class));
toClose.add(() -> stopWatch.stop().start("http"));
if (NetworkModule.HTTP_ENABLED.get(settings)) {
- toClose.add(injector.getInstance(HttpServer.class));
+ toClose.add(injector.getInstance(HttpServerTransport.class));
}
toClose.add(() -> stopWatch.stop().start("snapshot_service"));
toClose.add(injector.getInstance(SnapshotsService.class));
diff --git a/core/src/main/java/org/elasticsearch/node/NodeModule.java b/core/src/main/java/org/elasticsearch/node/NodeModule.java
index 6a8f8b90681..929e889503e 100644
--- a/core/src/main/java/org/elasticsearch/node/NodeModule.java
+++ b/core/src/main/java/org/elasticsearch/node/NodeModule.java
@@ -22,7 +22,6 @@ package org.elasticsearch.node;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.monitor.MonitorService;
-import org.elasticsearch.node.service.NodeService;
public class NodeModule extends AbstractModule {
@@ -38,7 +37,6 @@ public class NodeModule extends AbstractModule {
protected void configure() {
bind(Node.class).toInstance(node);
bind(MonitorService.class).toInstance(monitorService);
- bind(NodeService.class).asEagerSingleton();
bind(DiskThresholdMonitor.class).asEagerSingleton();
}
}
diff --git a/core/src/main/java/org/elasticsearch/node/service/NodeService.java b/core/src/main/java/org/elasticsearch/node/NodeService.java
similarity index 88%
rename from core/src/main/java/org/elasticsearch/node/service/NodeService.java
rename to core/src/main/java/org/elasticsearch/node/NodeService.java
index 7d9a148b271..cb245487152 100644
--- a/core/src/main/java/org/elasticsearch/node/service/NodeService.java
+++ b/core/src/main/java/org/elasticsearch/node/NodeService.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.elasticsearch.node.service;
+package org.elasticsearch.node;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
@@ -27,11 +27,10 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
-import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.discovery.Discovery;
-import org.elasticsearch.http.HttpServer;
+import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.ingest.IngestService;
@@ -55,17 +54,16 @@ public class NodeService extends AbstractComponent implements Closeable {
private final IngestService ingestService;
private final SettingsFilter settingsFilter;
private ScriptService scriptService;
+ private final HttpServerTransport httpServerTransport;
- @Nullable
- private final HttpServer httpServer;
private final Discovery discovery;
- @Inject
- public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
+ NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
TransportService transportService, IndicesService indicesService, PluginsService pluginService,
- CircuitBreakerService circuitBreakerService, ScriptService scriptService, @Nullable HttpServer httpServer,
- IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter) {
+ CircuitBreakerService circuitBreakerService, ScriptService scriptService,
+ @Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
+ SettingsFilter settingsFilter) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
@@ -74,7 +72,7 @@ public class NodeService extends AbstractComponent implements Closeable {
this.discovery = discovery;
this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService;
- this.httpServer = httpServer;
+ this.httpServerTransport = httpServerTransport;
this.ingestService = ingestService;
this.settingsFilter = settingsFilter;
this.scriptService = scriptService;
@@ -91,7 +89,7 @@ public class NodeService extends AbstractComponent implements Closeable {
jvm ? monitorService.jvmService().info() : null,
threadPool ? this.threadPool.info() : null,
transport ? transportService.info() : null,
- http ? (httpServer == null ? null : httpServer.info()) : null,
+ http ? (httpServerTransport == null ? null : httpServerTransport.info()) : null,
plugin ? (pluginService == null ? null : pluginService.info()) : null,
ingest ? (ingestService == null ? null : ingestService.info()) : null,
indices ? indicesService.getTotalIndexingBufferBytes() : null
@@ -111,7 +109,7 @@ public class NodeService extends AbstractComponent implements Closeable {
threadPool ? this.threadPool.stats() : null,
fs ? monitorService.fsService().stats() : null,
transport ? transportService.stats() : null,
- http ? (httpServer == null ? null : httpServer.stats()) : null,
+ http ? (httpServerTransport == null ? null : httpServerTransport.stats()) : null,
circuitBreaker ? circuitBreakerService.stats() : null,
script ? scriptService.stats() : null,
discoveryStats ? discovery.stats() : null,
diff --git a/core/src/main/java/org/elasticsearch/plugins/InstallPluginCommand.java b/core/src/main/java/org/elasticsearch/plugins/InstallPluginCommand.java
index dea6fcba312..b502b2a4016 100644
--- a/core/src/main/java/org/elasticsearch/plugins/InstallPluginCommand.java
+++ b/core/src/main/java/org/elasticsearch/plugins/InstallPluginCommand.java
@@ -33,9 +33,7 @@ import org.elasticsearch.cli.UserException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.FileSystemUtils;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
import java.io.BufferedReader;
import java.io.IOException;
@@ -63,7 +61,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
-import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
diff --git a/core/src/main/java/org/elasticsearch/plugins/ListPluginsCommand.java b/core/src/main/java/org/elasticsearch/plugins/ListPluginsCommand.java
index 3f21c44a8f4..a674e7c6e24 100644
--- a/core/src/main/java/org/elasticsearch/plugins/ListPluginsCommand.java
+++ b/core/src/main/java/org/elasticsearch/plugins/ListPluginsCommand.java
@@ -22,9 +22,7 @@ package org.elasticsearch.plugins;
import joptsimple.OptionSet;
import org.elasticsearch.cli.EnvironmentAwareCommand;
import org.elasticsearch.cli.Terminal;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
import java.io.IOException;
import java.nio.file.DirectoryStream;
@@ -33,7 +31,6 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
/**
* A command for the plugin cli to list plugins installed in elasticsearch.
diff --git a/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java b/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java
index ceb7e077e11..33fab61c24a 100644
--- a/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java
+++ b/core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java
@@ -69,8 +69,11 @@ public interface NetworkPlugin {
* See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation.
*/
default Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
- CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
- NamedXContentRegistry xContentRegistry, NetworkService networkService) {
+ CircuitBreakerService circuitBreakerService,
+ NamedWriteableRegistry namedWriteableRegistry,
+ NamedXContentRegistry xContentRegistry,
+ NetworkService networkService,
+ HttpServerTransport.Dispatcher dispatcher) {
return Collections.emptyMap();
}
}
diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java
index c701e8ff0ee..5ac82b7e454 100644
--- a/core/src/main/java/org/elasticsearch/rest/RestController.java
+++ b/core/src/main/java/org/elasticsearch/rest/RestController.java
@@ -19,23 +19,35 @@
package org.elasticsearch.rest;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.UnaryOperator;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
+import org.elasticsearch.common.io.Streams;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.http.HttpServerTransport;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
+import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
+import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.OK;
public class RestController extends AbstractComponent {
@@ -48,18 +60,27 @@ public class RestController extends AbstractComponent {
private final UnaryOperator handlerWrapper;
+ private final NodeClient client;
+
+ private final CircuitBreakerService circuitBreakerService;
+
/** Rest headers that are copied to internal requests made during a rest request. */
private final Set headersToCopy;
- public RestController(Settings settings, Set headersToCopy, UnaryOperator handlerWrapper) {
+ public RestController(Settings settings, Set headersToCopy, UnaryOperator handlerWrapper,
+ NodeClient client, CircuitBreakerService circuitBreakerService) {
super(settings);
this.headersToCopy = headersToCopy;
if (handlerWrapper == null) {
handlerWrapper = h -> h; // passthrough if no wrapper set
}
this.handlerWrapper = handlerWrapper;
+ this.client = client;
+ this.circuitBreakerService = circuitBreakerService;
}
+
+
/**
* Registers a REST handler to be executed when the provided {@code method} and {@code path} match the request.
*
@@ -137,7 +158,34 @@ public class RestController extends AbstractComponent {
return (handler != null) ? handler.canTripCircuitBreaker() : true;
}
- public void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception {
+ public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
+ if (request.rawPath().equals("/favicon.ico")) {
+ handleFavicon(request, channel);
+ return;
+ }
+ RestChannel responseChannel = channel;
+ try {
+ int contentLength = request.content().length();
+ if (canTripCircuitBreaker(request)) {
+ inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "");
+ } else {
+ inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
+ }
+ // iff we could reserve bytes for the request we need to send the response also over this channel
+ responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
+ dispatchRequest(request, responseChannel, client, threadContext);
+ } catch (Exception e) {
+ try {
+ responseChannel.sendResponse(new BytesRestResponse(channel, e));
+ } catch (Exception inner) {
+ inner.addSuppressed(e);
+ logger.error((Supplier>) () ->
+ new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
+ }
+ }
+ }
+
+ void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception {
if (!checkRequestParameters(request, channel)) {
return;
}
@@ -223,4 +271,84 @@ public class RestController extends AbstractComponent {
// my_index/my_type/http%3A%2F%2Fwww.google.com
return request.rawPath();
}
+
+ void handleFavicon(RestRequest request, RestChannel channel) {
+ if (request.method() == RestRequest.Method.GET) {
+ try {
+ try (InputStream stream = getClass().getResourceAsStream("/config/favicon.ico")) {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Streams.copy(stream, out);
+ BytesRestResponse restResponse = new BytesRestResponse(RestStatus.OK, "image/x-icon", out.toByteArray());
+ channel.sendResponse(restResponse);
+ }
+ } catch (IOException e) {
+ channel.sendResponse(new BytesRestResponse(INTERNAL_SERVER_ERROR, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
+ }
+ } else {
+ channel.sendResponse(new BytesRestResponse(FORBIDDEN, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
+ }
+ }
+
+ private static final class ResourceHandlingHttpChannel implements RestChannel {
+ private final RestChannel delegate;
+ private final CircuitBreakerService circuitBreakerService;
+ private final int contentLength;
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) {
+ this.delegate = delegate;
+ this.circuitBreakerService = circuitBreakerService;
+ this.contentLength = contentLength;
+ }
+
+ @Override
+ public XContentBuilder newBuilder() throws IOException {
+ return delegate.newBuilder();
+ }
+
+ @Override
+ public XContentBuilder newErrorBuilder() throws IOException {
+ return delegate.newErrorBuilder();
+ }
+
+ @Override
+ public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException {
+ return delegate.newBuilder(autoDetectSource, useFiltering);
+ }
+
+ @Override
+ public BytesStreamOutput bytesOutput() {
+ return delegate.bytesOutput();
+ }
+
+ @Override
+ public RestRequest request() {
+ return delegate.request();
+ }
+
+ @Override
+ public boolean detailedErrorsEnabled() {
+ return delegate.detailedErrorsEnabled();
+ }
+
+ @Override
+ public void sendResponse(RestResponse response) {
+ close();
+ delegate.sendResponse(response);
+ }
+
+ private void close() {
+ // attempt to close once atomically
+ if (closed.compareAndSet(false, true) == false) {
+ throw new IllegalStateException("Channel is already closed");
+ }
+ inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
+ }
+
+ }
+
+ private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) {
+ // We always obtain a fresh breaker to reflect changes to the breaker configuration.
+ return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java
index 11799c99cb1..3476c99d484 100644
--- a/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java
+++ b/core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java
@@ -30,7 +30,6 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpInfo;
-import org.elasticsearch.http.HttpServerAdapter;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@@ -89,8 +88,6 @@ public class NetworkModuleTests extends ModuleTestCase {
public HttpStats stats() {
return null;
}
- @Override
- public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {}
}
@@ -155,7 +152,8 @@ public class NetworkModuleTests extends ModuleTestCase {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
- NetworkService networkService) {
+ NetworkService networkService,
+ HttpServerTransport.Dispatcher requestDispatcher) {
return Collections.singletonMap("custom", custom);
}
});
@@ -195,7 +193,8 @@ public class NetworkModuleTests extends ModuleTestCase {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
- NetworkService networkService) {
+ NetworkService networkService,
+ HttpServerTransport.Dispatcher requestDispatcher) {
Map> supplierMap = new HashMap<>();
supplierMap.put("custom", custom);
supplierMap.put("default_custom", def);
@@ -228,7 +227,8 @@ public class NetworkModuleTests extends ModuleTestCase {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
- NetworkService networkService) {
+ NetworkService networkService,
+ HttpServerTransport.Dispatcher requestDispatcher) {
Map> supplierMap = new HashMap<>();
supplierMap.put("custom", custom);
supplierMap.put("default_custom", def);
@@ -276,6 +276,7 @@ public class NetworkModuleTests extends ModuleTestCase {
}
private NetworkModule newNetworkModule(Settings settings, boolean transportClient, NetworkPlugin... plugins) {
- return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null);
+ return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null,
+ (a, b, c) -> {});
}
}
diff --git a/core/src/test/java/org/elasticsearch/http/HttpServerTests.java b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java
deleted file mode 100644
index db9eebfe5fc..00000000000
--- a/core/src/test/java/org/elasticsearch/http/HttpServerTests.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.elasticsearch.http;
-
-import java.util.Collections;
-import java.util.Map;
-
-import org.elasticsearch.common.breaker.CircuitBreaker;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.component.AbstractLifecycleComponent;
-import org.elasticsearch.common.settings.ClusterSettings;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.BoundTransportAddress;
-import org.elasticsearch.common.transport.TransportAddress;
-import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.indices.breaker.CircuitBreakerService;
-import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
-import org.elasticsearch.rest.AbstractRestChannel;
-import org.elasticsearch.rest.BytesRestResponse;
-import org.elasticsearch.rest.RestController;
-import org.elasticsearch.rest.RestRequest;
-import org.elasticsearch.rest.RestResponse;
-import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.test.ESTestCase;
-import org.junit.Before;
-
-public class HttpServerTests extends ESTestCase {
- private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20);
- private HttpServer httpServer;
- private CircuitBreaker inFlightRequestsBreaker;
-
- @Before
- public void setup() {
- Settings settings = Settings.EMPTY;
- CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService(
- Settings.builder()
- .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT)
- .build(),
- new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
- // we can do this here only because we know that we don't adjust breaker settings dynamically in the test
- inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
-
- HttpServerTransport httpServerTransport = new TestHttpServerTransport();
- RestController restController = new RestController(settings, Collections.emptySet(), null);
- restController.registerHandler(RestRequest.Method.GET, "/",
- (request, channel, client) -> channel.sendResponse(
- new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
- restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel, client) -> {
- throw new IllegalArgumentException("test error");
- });
-
- httpServer = new HttpServer(settings, httpServerTransport, restController, null, circuitBreakerService);
- httpServer.start();
- }
-
- public void testDispatchRequestAddsAndFreesBytesOnSuccess() {
- int contentLength = BREAKER_LIMIT.bytesAsInt();
- String content = randomAsciiOfLength(contentLength);
- TestRestRequest request = new TestRestRequest("/", content);
- AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK);
-
- httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
-
- assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
- assertEquals(0, inFlightRequestsBreaker.getUsed());
- }
-
- public void testDispatchRequestAddsAndFreesBytesOnError() {
- int contentLength = BREAKER_LIMIT.bytesAsInt();
- String content = randomAsciiOfLength(contentLength);
- TestRestRequest request = new TestRestRequest("/error", content);
- AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST);
-
- httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
-
- assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
- assertEquals(0, inFlightRequestsBreaker.getUsed());
- }
-
- public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() {
- int contentLength = BREAKER_LIMIT.bytesAsInt();
- String content = randomAsciiOfLength(contentLength);
- // we will produce an error in the rest handler and one more when sending the error response
- TestRestRequest request = new TestRestRequest("/error", content);
- ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true);
-
- httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
-
- assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
- assertEquals(0, inFlightRequestsBreaker.getUsed());
- }
-
- public void testDispatchRequestLimitsBytes() {
- int contentLength = BREAKER_LIMIT.bytesAsInt() + 1;
- String content = randomAsciiOfLength(contentLength);
- TestRestRequest request = new TestRestRequest("/", content);
- AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE);
-
- httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
-
- assertEquals(1, inFlightRequestsBreaker.getTrippedCount());
- assertEquals(0, inFlightRequestsBreaker.getUsed());
- }
-
- private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements
- HttpServerTransport {
-
- public TestHttpServerTransport() {
- super(Settings.EMPTY);
- }
-
- @Override
- protected void doStart() {
- }
-
- @Override
- protected void doStop() {
- }
-
- @Override
- protected void doClose() {
- }
-
- @Override
- public BoundTransportAddress boundAddress() {
- TransportAddress transportAddress = buildNewFakeTransportAddress();
- return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress);
- }
-
- @Override
- public HttpInfo info() {
- return null;
- }
-
- @Override
- public HttpStats stats() {
- return null;
- }
-
- @Override
- public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
-
- }
- }
-
- private static final class AssertingChannel extends AbstractRestChannel {
- private final RestStatus expectedStatus;
-
- protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) {
- super(request, detailedErrorsEnabled);
- this.expectedStatus = expectedStatus;
- }
-
- @Override
- public void sendResponse(RestResponse response) {
- assertEquals(expectedStatus, response.status());
- }
- }
-
- private static final class ExceptionThrowingChannel extends AbstractRestChannel {
-
- protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) {
- super(request, detailedErrorsEnabled);
- }
-
- @Override
- public void sendResponse(RestResponse response) {
- throw new IllegalStateException("always throwing an exception for testing");
- }
- }
-
- private static final class TestRestRequest extends RestRequest {
-
- private final BytesReference content;
-
- private TestRestRequest(String path, String content) {
- super(NamedXContentRegistry.EMPTY, Collections.emptyMap(), path);
- this.content = new BytesArray(content);
- }
-
- @Override
- public Method method() {
- return Method.GET;
- }
-
- @Override
- public String uri() {
- return null;
- }
-
- @Override
- public boolean hasContent() {
- return true;
- }
-
- @Override
- public BytesReference content() {
- return content;
- }
-
- @Override
- public String header(String name) {
- return null;
- }
-
- @Override
- public Iterable> headers() {
- return null;
- }
-
- }
-}
diff --git a/core/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java b/core/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java
index 02fe0d03c77..83072636b76 100644
--- a/core/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java
+++ b/core/src/test/java/org/elasticsearch/ingest/IngestProcessorNotInstalledOnAllNodesIT.java
@@ -22,7 +22,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.NodeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
diff --git a/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java b/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java
index 2dc95f8e9f1..94b3f3737cb 100644
--- a/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java
+++ b/core/src/test/java/org/elasticsearch/node/internal/InternalSettingsPreparerTests.java
@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.env.Environment;
+import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;
diff --git a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java
index cce5c463759..e7064554908 100644
--- a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java
+++ b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java
@@ -29,11 +29,26 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.UnaryOperator;
import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.common.breaker.CircuitBreaker;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.logging.DeprecationLogger;
+import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.BoundTransportAddress;
+import org.elasticsearch.common.transport.TransportAddress;
+import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.http.HttpInfo;
+import org.elasticsearch.http.HttpServerTransport;
+import org.elasticsearch.http.HttpStats;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
+import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
+import org.junit.Before;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -43,10 +58,39 @@ import static org.mockito.Mockito.verify;
public class RestControllerTests extends ESTestCase {
+ private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20);
+ private CircuitBreaker inFlightRequestsBreaker;
+ private RestController restController;
+ private HierarchyCircuitBreakerService circuitBreakerService;
+
+ @Before
+ public void setup() {
+ Settings settings = Settings.EMPTY;
+ circuitBreakerService = new HierarchyCircuitBreakerService(
+ Settings.builder()
+ .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT)
+ .build(),
+ new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
+ // we can do this here only because we know that we don't adjust breaker settings dynamically in the test
+ inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
+
+ HttpServerTransport httpServerTransport = new TestHttpServerTransport();
+ restController = new RestController(settings, Collections.emptySet(), null, null, circuitBreakerService);
+ restController.registerHandler(RestRequest.Method.GET, "/",
+ (request, channel, client) -> channel.sendResponse(
+ new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
+ restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel, client) -> {
+ throw new IllegalArgumentException("test error");
+ });
+
+ httpServerTransport.start();
+ }
+
+
public void testApplyRelevantHeaders() throws Exception {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Set headers = new HashSet<>(Arrays.asList("header.1", "header.2"));
- final RestController restController = new RestController(Settings.EMPTY, headers, null);
+ final RestController restController = new RestController(Settings.EMPTY, headers, null, null, circuitBreakerService);
restController.registerHandler(RestRequest.Method.GET, "/",
(RestRequest request, RestChannel channel, NodeClient client) -> {
assertEquals("true", threadContext.getHeader("header.1"));
@@ -66,7 +110,7 @@ public class RestControllerTests extends ESTestCase {
}
public void testCanTripCircuitBreaker() throws Exception {
- RestController controller = new RestController(Settings.EMPTY, Collections.emptySet(), null);
+ RestController controller = new RestController(Settings.EMPTY, Collections.emptySet(), null, null, circuitBreakerService);
// trip circuit breaker by default
controller.registerHandler(RestRequest.Method.GET, "/trip", new FakeRestHandler(true));
controller.registerHandler(RestRequest.Method.GET, "/do-not-trip", new FakeRestHandler(false));
@@ -126,7 +170,8 @@ public class RestControllerTests extends ESTestCase {
assertSame(handler, h);
return (RestRequest request, RestChannel channel, NodeClient client) -> wrapperCalled.set(true);
};
- final RestController restController = new RestController(Settings.EMPTY, Collections.emptySet(), wrapper);
+ final RestController restController = new RestController(Settings.EMPTY, Collections.emptySet(), wrapper, null,
+ circuitBreakerService);
restController.registerHandler(RestRequest.Method.GET, "/", handler);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
restController.dispatchRequest(new FakeRestRequest.Builder(xContentRegistry()).build(), null, null, threadContext);
@@ -154,4 +199,156 @@ public class RestControllerTests extends ESTestCase {
return canTripCircuitBreaker;
}
}
+
+ public void testDispatchRequestAddsAndFreesBytesOnSuccess() {
+ int contentLength = BREAKER_LIMIT.bytesAsInt();
+ String content = randomAsciiOfLength(contentLength);
+ TestRestRequest request = new TestRestRequest("/", content);
+ AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK);
+
+ restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
+
+ assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
+ assertEquals(0, inFlightRequestsBreaker.getUsed());
+ }
+
+ public void testDispatchRequestAddsAndFreesBytesOnError() {
+ int contentLength = BREAKER_LIMIT.bytesAsInt();
+ String content = randomAsciiOfLength(contentLength);
+ TestRestRequest request = new TestRestRequest("/error", content);
+ AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST);
+
+ restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
+
+ assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
+ assertEquals(0, inFlightRequestsBreaker.getUsed());
+ }
+
+ public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() {
+ int contentLength = BREAKER_LIMIT.bytesAsInt();
+ String content = randomAsciiOfLength(contentLength);
+ // we will produce an error in the rest handler and one more when sending the error response
+ TestRestRequest request = new TestRestRequest("/error", content);
+ ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true);
+
+ restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
+
+ assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
+ assertEquals(0, inFlightRequestsBreaker.getUsed());
+ }
+
+ public void testDispatchRequestLimitsBytes() {
+ int contentLength = BREAKER_LIMIT.bytesAsInt() + 1;
+ String content = randomAsciiOfLength(contentLength);
+ TestRestRequest request = new TestRestRequest("/", content);
+ AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE);
+
+ restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
+
+ assertEquals(1, inFlightRequestsBreaker.getTrippedCount());
+ assertEquals(0, inFlightRequestsBreaker.getUsed());
+ }
+
+ private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements
+ HttpServerTransport {
+
+ public TestHttpServerTransport() {
+ super(Settings.EMPTY);
+ }
+
+ @Override
+ protected void doStart() {
+ }
+
+ @Override
+ protected void doStop() {
+ }
+
+ @Override
+ protected void doClose() {
+ }
+
+ @Override
+ public BoundTransportAddress boundAddress() {
+ TransportAddress transportAddress = buildNewFakeTransportAddress();
+ return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress);
+ }
+
+ @Override
+ public HttpInfo info() {
+ return null;
+ }
+
+ @Override
+ public HttpStats stats() {
+ return null;
+ }
+ }
+
+ private static final class AssertingChannel extends AbstractRestChannel {
+ private final RestStatus expectedStatus;
+
+ protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) {
+ super(request, detailedErrorsEnabled);
+ this.expectedStatus = expectedStatus;
+ }
+
+ @Override
+ public void sendResponse(RestResponse response) {
+ assertEquals(expectedStatus, response.status());
+ }
+ }
+
+ private static final class ExceptionThrowingChannel extends AbstractRestChannel {
+
+ protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) {
+ super(request, detailedErrorsEnabled);
+ }
+
+ @Override
+ public void sendResponse(RestResponse response) {
+ throw new IllegalStateException("always throwing an exception for testing");
+ }
+ }
+
+ private static final class TestRestRequest extends RestRequest {
+
+ private final BytesReference content;
+
+ private TestRestRequest(String path, String content) {
+ super(NamedXContentRegistry.EMPTY, Collections.emptyMap(), path);
+ this.content = new BytesArray(content);
+ }
+
+ @Override
+ public Method method() {
+ return Method.GET;
+ }
+
+ @Override
+ public String uri() {
+ return null;
+ }
+
+ @Override
+ public boolean hasContent() {
+ return true;
+ }
+
+ @Override
+ public BytesReference content() {
+ return content;
+ }
+
+ @Override
+ public String header(String name) {
+ return null;
+ }
+
+ @Override
+ public Iterable> headers() {
+ return null;
+ }
+
+ }
}
diff --git a/core/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java
index 9de530d417d..ba478331ca0 100644
--- a/core/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java
+++ b/core/src/test/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsActionTests.java
@@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.admin.cluster;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
@@ -43,7 +44,7 @@ public class RestNodesStatsActionTests extends ESTestCase {
@Override
public void setUp() throws Exception {
super.setUp();
- action = new RestNodesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null));
+ action = new RestNodesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null, null, null));
}
public void testUnrecognizedMetric() throws IOException {
diff --git a/core/src/test/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsActionTests.java
index feac1672c11..0aa6e497836 100644
--- a/core/src/test/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsActionTests.java
+++ b/core/src/test/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsActionTests.java
@@ -41,7 +41,7 @@ public class RestIndicesStatsActionTests extends ESTestCase {
@Override
public void setUp() throws Exception {
super.setUp();
- action = new RestIndicesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null));
+ action = new RestIndicesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null, null, null));
}
public void testUnrecognizedMetric() throws IOException {
diff --git a/core/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java
index 2a22541bbc7..ae66664b456 100644
--- a/core/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java
+++ b/core/src/test/java/org/elasticsearch/rest/action/cat/RestIndicesActionTests.java
@@ -74,7 +74,7 @@ public class RestIndicesActionTests extends ESTestCase {
public void testBuildTable() {
final Settings settings = Settings.EMPTY;
- final RestController restController = new RestController(settings, Collections.emptySet(), null);
+ final RestController restController = new RestController(settings, Collections.emptySet(), null, null, null);
final RestIndicesAction action = new RestIndicesAction(settings, restController, new IndexNameExpressionResolver(settings));
// build a (semi-)random table
diff --git a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java
index fa93e4a80d3..88623687bf1 100644
--- a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java
+++ b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java
@@ -50,7 +50,7 @@ public class RestRecoveryActionTests extends ESTestCase {
public void testRestRecoveryAction() {
final Settings settings = Settings.EMPTY;
- final RestController restController = new RestController(settings, Collections.emptySet(), null);
+ final RestController restController = new RestController(settings, Collections.emptySet(), null, null, null);
final RestRecoveryAction action = new RestRecoveryAction(settings, restController, restController);
final int totalShards = randomIntBetween(1, 32);
final int successfulShards = Math.max(0, totalShards - randomIntBetween(1, 2));
diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java
index c6c2899e4b3..138ed0a67be 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerTransport.java
@@ -63,7 +63,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpInfo;
-import org.elasticsearch.http.HttpServerAdapter;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
@@ -210,6 +209,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
protected final ByteSizeValue maxCumulationBufferCapacity;
protected final int maxCompositeBufferComponents;
+ private final Dispatcher dispatcher;
protected volatile ServerBootstrap serverBootstrap;
@@ -220,17 +220,17 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
// package private for testing
Netty4OpenChannelsHandler serverOpenChannels;
- protected volatile HttpServerAdapter httpServerAdapter;
private final Netty4CorsConfig corsConfig;
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
- NamedXContentRegistry xContentRegistry) {
+ NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
super(settings);
this.networkService = networkService;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
this.xContentRegistry = xContentRegistry;
+ this.dispatcher = dispatcher;
ByteSizeValue maxContentLength = SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings);
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
@@ -286,11 +286,6 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
return this.settings;
}
- @Override
- public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
- this.httpServerAdapter = httpServerAdapter;
- }
-
@Override
protected void doStart() {
boolean success = false;
@@ -331,6 +326,9 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
this.boundAddress = createBoundHttpAddress();
+ if (logger.isInfoEnabled()) {
+ logger.info("{}", boundAddress);
+ }
success = true;
} finally {
if (success == false) {
@@ -511,7 +509,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
}
protected void dispatchRequest(RestRequest request, RestChannel channel) {
- httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext());
+ dispatcher.dispatch(request, channel, threadPool.getThreadContext());
}
protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java
index 6a435c19efa..0516a449629 100644
--- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java
+++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java
@@ -93,9 +93,12 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
@Override
public Map> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
- CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
- NamedXContentRegistry xContentRegistry, NetworkService networkService) {
+ CircuitBreakerService circuitBreakerService,
+ NamedWriteableRegistry namedWriteableRegistry,
+ NamedXContentRegistry xContentRegistry,
+ NetworkService networkService,
+ HttpServerTransport.Dispatcher dispatcher) {
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
- () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry));
+ () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher));
}
}
diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java
index 457b2242af4..c7427e717b3 100644
--- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java
+++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpChannelTests.java
@@ -188,7 +188,8 @@ public class Netty4HttpChannelTests extends ESTestCase {
public void testHeadersSet() {
Settings settings = Settings.builder().build();
try (Netty4HttpServerTransport httpServerTransport =
- new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry())) {
+ new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(),
+ (request, channel, context) -> {})) {
httpServerTransport.start();
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
httpRequest.headers().add(HttpHeaderNames.ORIGIN, "remote");
@@ -218,7 +219,8 @@ public class Netty4HttpChannelTests extends ESTestCase {
public void testConnectionClose() throws Exception {
final Settings settings = Settings.builder().build();
try (Netty4HttpServerTransport httpServerTransport =
- new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry())) {
+ new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(),
+ (request, channel, context) -> {})) {
httpServerTransport.start();
final FullHttpRequest httpRequest;
final boolean close = randomBoolean();
@@ -253,7 +255,8 @@ public class Netty4HttpChannelTests extends ESTestCase {
private FullHttpResponse executeRequest(final Settings settings, final String originValue, final String host) {
// construct request and send it over the transport layer
try (Netty4HttpServerTransport httpServerTransport =
- new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry())) {
+ new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(),
+ (request, channel, context) -> {})) {
httpServerTransport.start();
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
if (originValue != null) {
diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java
index 5c7a249f74a..c0f8746d514 100644
--- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java
+++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerPipeliningTests.java
@@ -160,7 +160,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
Netty4HttpServerPipeliningTests.this.networkService,
Netty4HttpServerPipeliningTests.this.bigArrays,
Netty4HttpServerPipeliningTests.this.threadPool,
- xContentRegistry());
+ xContentRegistry(), (request, channel, context) -> {});
}
@Override
diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java
index 7481ba4c3a3..e3dd6d8a78e 100644
--- a/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java
+++ b/modules/transport-netty4/src/test/java/org/elasticsearch/http/netty4/Netty4HttpServerTransportTests.java
@@ -121,9 +121,8 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
*/
public void testExpectContinueHeader() throws Exception {
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool,
- xContentRegistry())) {
- transport.httpServerAdapter((request, channel, context) ->
- channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))));
+ xContentRegistry(), (request, channel, context) ->
+ channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))))) {
transport.start();
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
@@ -145,12 +144,12 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
public void testBindUnavailableAddress() {
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool,
- xContentRegistry())) {
+ xContentRegistry(), (request, channel, context) -> {})) {
transport.start();
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build();
try (Netty4HttpServerTransport otherTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
- xContentRegistry())) {
+ xContentRegistry(), (request, channel, context) -> {})) {
BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start());
assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage());
}
diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java
index 6a2e6bacae4..8774ba5836b 100644
--- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java
+++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java
@@ -29,18 +29,14 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
-import org.elasticsearch.discovery.zen.UnicastHostsProvider;
-import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.fetch.FetchPhase;
-import org.elasticsearch.test.discovery.MockZenPing;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
diff --git a/test/framework/src/main/java/org/elasticsearch/test/AbstractQueryTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/AbstractQueryTestCase.java
index 01bab59eb27..2a230106994 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/AbstractQueryTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/AbstractQueryTestCase.java
@@ -81,7 +81,7 @@ import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
-import org.elasticsearch.node.internal.InternalSettingsPreparer;
+import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
index 6c575126276..b4aee750a31 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java
@@ -27,7 +27,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
@@ -85,7 +84,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
-import org.elasticsearch.node.service.NodeService;
+import org.elasticsearch.node.NodeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;