Remove HttpServer and HttpServerAdapter in favor of a simple dispatch method (#22636)

Today we have quite some abstractions that are essentially providing a simple
dispatch method to the plugins defining a `HttpServerTransport`. This commit
removes `HttpServer` and `HttpServerAdaptor` and introduces a simple `Dispatcher` functional
interface that delegate to `RestController` by default.

Relates to #18482
This commit is contained in:
Simon Willnauer 2017-01-16 21:06:08 +01:00 committed by GitHub
parent 193111919c
commit f30b1f82ee
43 changed files with 464 additions and 583 deletions

View File

@ -430,7 +430,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]monitor[/\\]jvm[/\\]GcNames.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]monitor[/\\]jvm[/\\]HotThreads.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]node[/\\]Node.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]node[/\\]internal[/\\]InternalSettingsPreparer.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]plugins[/\\]PluginsService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]repositories[/\\]RepositoriesService.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]repositories[/\\]Repository.java" checks="LineLength" />

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -196,6 +195,7 @@ import org.elasticsearch.action.termvectors.TransportShardMultiTermsVectorAction
import org.elasticsearch.action.termvectors.TransportTermVectorsAction;
import org.elasticsearch.action.update.TransportUpdateAction;
import org.elasticsearch.action.update.UpdateAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.NamedRegistry;
import org.elasticsearch.common.inject.AbstractModule;
@ -205,6 +205,7 @@ import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.ActionPlugin.ActionHandler;
import org.elasticsearch.rest.RestController;
@ -332,7 +333,8 @@ public class ActionModule extends AbstractModule {
private final RestController restController;
public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver resolver,
ClusterSettings clusterSettings, ThreadPool threadPool, List<ActionPlugin> actionPlugins) {
ClusterSettings clusterSettings, ThreadPool threadPool, List<ActionPlugin> actionPlugins,
NodeClient nodeClient, CircuitBreakerService circuitBreakerService) {
this.transportClient = transportClient;
this.settings = settings;
this.actionPlugins = actionPlugins;
@ -352,9 +354,14 @@ public class ActionModule extends AbstractModule {
restWrapper = newRestWrapper;
}
}
restController = new RestController(settings, headers, restWrapper);
if (transportClient) {
restController = null;
} else {
restController = new RestController(settings, headers, restWrapper, nodeClient, circuitBreakerService);
}
}
public Map<String, ActionHandler<?, ?>> getActions() {
return actions;
}
@ -648,8 +655,10 @@ public class ActionModule extends AbstractModule {
}
}
// Bind the RestController which is required (by Node) even if rest isn't enabled.
bind(RestController.class).toInstance(restController);
if (restController != null) {
// Bind the RestController which is required (by Node) even if rest isn't enabled.
bind(RestController.class).toInstance(restController);
}
// Setup the RestHandlers
if (NetworkModule.HTTP_ENABLED.get(settings)) {

View File

@ -29,7 +29,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -29,7 +29,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -39,7 +39,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -30,7 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -30,7 +30,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -36,7 +36,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.ingest.IngestInfo;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -19,7 +19,6 @@
package org.elasticsearch.action.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
@ -28,7 +27,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.ingest.PipelineStore;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

View File

@ -40,7 +40,6 @@ import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.network.IfConfig;
import org.elasticsearch.common.settings.KeyStoreWrapper;
import org.elasticsearch.common.settings.SecureSetting;
import org.elasticsearch.common.settings.SecureSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -50,7 +49,7 @@ import org.elasticsearch.monitor.os.OsProbe;
import org.elasticsearch.monitor.process.ProcessProbe;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.InternalSettingsPreparer;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

View File

@ -24,7 +24,7 @@ import joptsimple.OptionSpec;
import joptsimple.util.KeyValuePair;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.InternalSettingsPreparer;
import java.util.HashMap;
import java.util.Locale;

View File

@ -46,7 +46,7 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.Plugin;
@ -160,7 +160,7 @@ public abstract class TransportClient extends AbstractClient {
}
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
ActionModule actionModule = new ActionModule(true, settings, null, settingsModule.getClusterSettings(),
threadPool, pluginsService.filterPlugins(ActionPlugin.class));
threadPool, pluginsService.filterPlugins(ActionPlugin.class), null, null);
modules.add(actionModule);
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
@ -170,7 +170,7 @@ public abstract class TransportClient extends AbstractClient {
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService);
bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = new TransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(),

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry.FromXContent;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
@ -109,13 +110,13 @@ public final class NetworkModule {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService) {
NetworkService networkService, HttpServerTransport.Dispatcher dispatcher) {
this.settings = settings;
this.transportClient = transportClient;
for (NetworkPlugin plugin : plugins) {
if (transportClient == false && HTTP_ENABLED.get(settings)) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings, threadPool, bigArrays,
circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService);
circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, dispatcher);
for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) {
registerHttpTransport(entry.getKey(), entry.getValue());
}

View File

@ -54,6 +54,7 @@ public class SettingsModule implements Module {
private final Logger logger;
private final IndexScopedSettings indexScopedSettings;
private final ClusterSettings clusterSettings;
private final SettingsFilter settingsFilter;
public SettingsModule(Settings settings, Setting<?>... additionalSettings) {
this(settings, Arrays.asList(additionalSettings), Collections.emptyList());
@ -137,12 +138,13 @@ public class SettingsModule implements Module {
final Predicate<String> acceptOnlyClusterSettings = TRIBE_CLIENT_NODE_SETTINGS_PREDICATE.negate();
clusterSettings.validate(settings.filter(acceptOnlyClusterSettings));
validateTribeSettings(settings, clusterSettings);
this.settingsFilter = new SettingsFilter(settings, settingsFilterPattern);
}
@Override
public void configure(Binder binder) {
binder.bind(Settings.class).toInstance(settings);
binder.bind(SettingsFilter.class).toInstance(new SettingsFilter(settings, settingsFilterPattern));
binder.bind(SettingsFilter.class).toInstance(settingsFilter);
binder.bind(ClusterSettings.class).toInstance(clusterSettings);
binder.bind(IndexScopedSettings.class).toInstance(indexScopedSettings);
}
@ -218,4 +220,6 @@ public class SettingsModule implements Module {
public ClusterSettings getClusterSettings() {
return clusterSettings;
}
public SettingsFilter getSettingsFilter() { return settingsFilter; }
}

View File

@ -1,206 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
/**
* A component to serve http requests, backed by rest handlers.
*/
public class HttpServer extends AbstractLifecycleComponent implements HttpServerAdapter {
private final HttpServerTransport transport;
private final RestController restController;
private final NodeClient client;
private final CircuitBreakerService circuitBreakerService;
public HttpServer(Settings settings, HttpServerTransport transport, RestController restController,
NodeClient client, CircuitBreakerService circuitBreakerService) {
super(settings);
this.transport = transport;
this.restController = restController;
this.client = client;
this.circuitBreakerService = circuitBreakerService;
transport.httpServerAdapter(this);
}
@Override
protected void doStart() {
transport.start();
if (logger.isInfoEnabled()) {
logger.info("{}", transport.boundAddress());
}
}
@Override
protected void doStop() {
transport.stop();
}
@Override
protected void doClose() {
transport.close();
}
public HttpInfo info() {
return transport.info();
}
public HttpStats stats() {
return transport.stats();
}
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
if (request.rawPath().equals("/favicon.ico")) {
handleFavicon(request, channel);
return;
}
RestChannel responseChannel = channel;
try {
int contentLength = request.content().length();
if (restController.canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
restController.dispatchRequest(request, responseChannel, client, threadContext);
} catch (Exception e) {
try {
responseChannel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error((Supplier<?>) () ->
new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}
void handleFavicon(RestRequest request, RestChannel channel) {
if (request.method() == RestRequest.Method.GET) {
try {
try (InputStream stream = getClass().getResourceAsStream("/config/favicon.ico")) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(stream, out);
BytesRestResponse restResponse = new BytesRestResponse(RestStatus.OK, "image/x-icon", out.toByteArray());
channel.sendResponse(restResponse);
}
} catch (IOException e) {
channel.sendResponse(new BytesRestResponse(INTERNAL_SERVER_ERROR, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
} else {
channel.sendResponse(new BytesRestResponse(FORBIDDEN, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
}
private static final class ResourceHandlingHttpChannel implements RestChannel {
private final RestChannel delegate;
private final CircuitBreakerService circuitBreakerService;
private final int contentLength;
private final AtomicBoolean closed = new AtomicBoolean();
public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) {
this.delegate = delegate;
this.circuitBreakerService = circuitBreakerService;
this.contentLength = contentLength;
}
@Override
public XContentBuilder newBuilder() throws IOException {
return delegate.newBuilder();
}
@Override
public XContentBuilder newErrorBuilder() throws IOException {
return delegate.newErrorBuilder();
}
@Override
public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException {
return delegate.newBuilder(autoDetectSource, useFiltering);
}
@Override
public BytesStreamOutput bytesOutput() {
return delegate.bytesOutput();
}
@Override
public RestRequest request() {
return delegate.request();
}
@Override
public boolean detailedErrorsEnabled() {
return delegate.detailedErrorsEnabled();
}
@Override
public void sendResponse(RestResponse response) {
close();
delegate.sendResponse(response);
}
private void close() {
// attempt to close once atomically
if (closed.compareAndSet(false, true) == false) {
throw new IllegalStateException("Channel is already closed");
}
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
}
}
private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) {
// We always obtain a fresh breaker to reflect changes to the breaker configuration.
return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
}
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
public interface HttpServerAdapter {
void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext context);
}

View File

@ -21,6 +21,9 @@ package org.elasticsearch.http;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
public interface HttpServerTransport extends LifecycleComponent {
@ -33,6 +36,15 @@ public interface HttpServerTransport extends LifecycleComponent {
HttpStats stats();
void httpServerAdapter(HttpServerAdapter httpServerAdapter);
@FunctionalInterface
interface Dispatcher {
/**
* Dispatches the {@link RestRequest} to the relevant request handler or responds to the given rest channel directly if
* the request can't be handled by any request handler.
* @param request the request to dispatch
* @param channel the response channel of this request
* @param threadContext the nodes thread context
*/
void dispatch(RestRequest request, RestChannel channel, ThreadContext threadContext);
}
}

View File

@ -21,10 +21,6 @@ package org.elasticsearch.index.translog;
import org.elasticsearch.cli.MultiCommand;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
/**
* Class encapsulating and dispatching commands from the {@code elasticsearch-translog} command line tool

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.node.internal;
package org.elasticsearch.node;
import java.io.IOException;
import java.nio.file.Files;
@ -106,7 +106,8 @@ public class InternalSettingsPreparer {
}
}
if (foundSuffixes.size() > 1) {
throw new SettingsException("multiple settings files found with suffixes: " + Strings.collectionToDelimitedString(foundSuffixes, ","));
throw new SettingsException("multiple settings files found with suffixes: "
+ Strings.collectionToDelimitedString(foundSuffixes, ","));
}
// re-initialize settings now that the config file has been loaded
@ -195,7 +196,9 @@ public class InternalSettingsPreparer {
private static String promptForValue(String key, Terminal terminal, boolean secret) {
if (terminal == null) {
throw new UnsupportedOperationException("found property [" + key + "] with value [" + (secret ? SECRET_PROMPT_VALUE : TEXT_PROMPT_VALUE) +"]. prompting for property values is only supported when running elasticsearch in the foreground");
throw new UnsupportedOperationException("found property [" + key + "] with value ["
+ (secret ? SECRET_PROMPT_VALUE : TEXT_PROMPT_VALUE)
+ "]. prompting for property values is only supported when running elasticsearch in the foreground");
}
if (secret) {

View File

@ -84,7 +84,6 @@ import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.GatewayModule;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.indices.IndicesModule;
@ -101,8 +100,6 @@ import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.AnalysisPlugin;
import org.elasticsearch.plugins.ClusterPlugin;
@ -117,6 +114,7 @@ import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.plugins.ScriptPlugin;
import org.elasticsearch.plugins.SearchPlugin;
import org.elasticsearch.repositories.RepositoriesModule;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
@ -342,14 +340,18 @@ public class Node implements Closeable {
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
modules.add(indicesModule);
SearchModule searchModule = new SearchModule(settings, false, pluginsService.filterPlugins(SearchPlugin.class));
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getClusterSettings(), threadPool, pluginsService.filterPlugins(ActionPlugin.class));
modules.add(actionModule);
modules.add(new GatewayModule());
CircuitBreakerService circuitBreakerService = createCircuitBreakerService(settingsModule.getSettings(),
settingsModule.getClusterSettings());
resourcesToClose.add(circuitBreakerService);
ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
settingsModule.getClusterSettings(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client,
circuitBreakerService);
modules.add(actionModule);
modules.add(new GatewayModule());
BigArrays bigArrays = createBigArrays(settings, circuitBreakerService);
resourcesToClose.add(bigArrays);
modules.add(settingsModule);
@ -388,30 +390,35 @@ public class Node implements Closeable {
pluginsService.filterPlugins(Plugin.class).stream()
.map(Plugin::getCustomMetaDataUpgrader)
.collect(Collectors.toList());
final RestController restController = actionModule.getRestController();
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService);
threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService,
restController::dispatchRequest);
final MetaDataUpgrader metaDataUpgrader = new MetaDataUpgrader(customMetaDataUpgraders);
final Transport transport = networkModule.getTransportSupplier().get();
final TransportService transportService = newTransportService(settings, transport, threadPool,
networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings());
final Consumer<Binder> httpBind;
final HttpServerTransport httpServerTransport;
if (networkModule.isHttpEnabled()) {
HttpServerTransport httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
HttpServer httpServer = new HttpServer(settings, httpServerTransport, actionModule.getRestController(), client,
circuitBreakerService);
httpServerTransport = networkModule.getHttpServerTransportSupplier().get();
httpBind = b -> {
b.bind(HttpServer.class).toInstance(httpServer);
b.bind(HttpServerTransport.class).toInstance(httpServerTransport);
};
} else {
httpBind = b -> {
b.bind(HttpServer.class).toProvider(Providers.of(null));
b.bind(HttpServerTransport.class).toProvider(Providers.of(null));
};
httpServerTransport = null;
}
final DiscoveryModule discoveryModule = new DiscoveryModule(this.settings, threadPool, transportService,
namedWriteableRegistry, networkService, clusterService, pluginsService.filterPlugins(DiscoveryPlugin.class));
NodeService nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter());
modules.add(b -> {
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
@ -628,7 +635,7 @@ public class Node implements Closeable {
}
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServer.class).start();
injector.getInstance(HttpServerTransport.class).start();
}
// start nodes now, after the http server, because it may take some time
@ -658,7 +665,7 @@ public class Node implements Closeable {
injector.getInstance(TribeService.class).stop();
injector.getInstance(ResourceWatcherService.class).stop();
if (NetworkModule.HTTP_ENABLED.get(settings)) {
injector.getInstance(HttpServer.class).stop();
injector.getInstance(HttpServerTransport.class).stop();
}
injector.getInstance(SnapshotsService.class).stop();
@ -708,7 +715,7 @@ public class Node implements Closeable {
toClose.add(injector.getInstance(NodeService.class));
toClose.add(() -> stopWatch.stop().start("http"));
if (NetworkModule.HTTP_ENABLED.get(settings)) {
toClose.add(injector.getInstance(HttpServer.class));
toClose.add(injector.getInstance(HttpServerTransport.class));
}
toClose.add(() -> stopWatch.stop().start("snapshot_service"));
toClose.add(injector.getInstance(SnapshotsService.class));

View File

@ -22,7 +22,6 @@ package org.elasticsearch.node;
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.monitor.MonitorService;
import org.elasticsearch.node.service.NodeService;
public class NodeModule extends AbstractModule {
@ -38,7 +37,6 @@ public class NodeModule extends AbstractModule {
protected void configure() {
bind(Node.class).toInstance(node);
bind(MonitorService.class).toInstance(monitorService);
bind(NodeService.class).asEagerSingleton();
bind(DiskThresholdMonitor.class).asEagerSingleton();
}
}

View File

@ -17,7 +17,7 @@
* under the License.
*/
package org.elasticsearch.node.service;
package org.elasticsearch.node;
import org.elasticsearch.Build;
import org.elasticsearch.Version;
@ -27,11 +27,10 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.http.HttpServer;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.ingest.IngestService;
@ -55,17 +54,16 @@ public class NodeService extends AbstractComponent implements Closeable {
private final IngestService ingestService;
private final SettingsFilter settingsFilter;
private ScriptService scriptService;
private final HttpServerTransport httpServerTransport;
@Nullable
private final HttpServer httpServer;
private final Discovery discovery;
@Inject
public NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
NodeService(Settings settings, ThreadPool threadPool, MonitorService monitorService, Discovery discovery,
TransportService transportService, IndicesService indicesService, PluginsService pluginService,
CircuitBreakerService circuitBreakerService, ScriptService scriptService, @Nullable HttpServer httpServer,
IngestService ingestService, ClusterService clusterService, SettingsFilter settingsFilter) {
CircuitBreakerService circuitBreakerService, ScriptService scriptService,
@Nullable HttpServerTransport httpServerTransport, IngestService ingestService, ClusterService clusterService,
SettingsFilter settingsFilter) {
super(settings);
this.threadPool = threadPool;
this.monitorService = monitorService;
@ -74,7 +72,7 @@ public class NodeService extends AbstractComponent implements Closeable {
this.discovery = discovery;
this.pluginService = pluginService;
this.circuitBreakerService = circuitBreakerService;
this.httpServer = httpServer;
this.httpServerTransport = httpServerTransport;
this.ingestService = ingestService;
this.settingsFilter = settingsFilter;
this.scriptService = scriptService;
@ -91,7 +89,7 @@ public class NodeService extends AbstractComponent implements Closeable {
jvm ? monitorService.jvmService().info() : null,
threadPool ? this.threadPool.info() : null,
transport ? transportService.info() : null,
http ? (httpServer == null ? null : httpServer.info()) : null,
http ? (httpServerTransport == null ? null : httpServerTransport.info()) : null,
plugin ? (pluginService == null ? null : pluginService.info()) : null,
ingest ? (ingestService == null ? null : ingestService.info()) : null,
indices ? indicesService.getTotalIndexingBufferBytes() : null
@ -111,7 +109,7 @@ public class NodeService extends AbstractComponent implements Closeable {
threadPool ? this.threadPool.stats() : null,
fs ? monitorService.fsService().stats() : null,
transport ? transportService.stats() : null,
http ? (httpServer == null ? null : httpServer.stats()) : null,
http ? (httpServerTransport == null ? null : httpServerTransport.stats()) : null,
circuitBreaker ? circuitBreakerService.stats() : null,
script ? scriptService.stats() : null,
discoveryStats ? discovery.stats() : null,

View File

@ -33,9 +33,7 @@ import org.elasticsearch.cli.UserException;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import java.io.BufferedReader;
import java.io.IOException;
@ -63,7 +61,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;

View File

@ -22,9 +22,7 @@ package org.elasticsearch.plugins;
import joptsimple.OptionSet;
import org.elasticsearch.cli.EnvironmentAwareCommand;
import org.elasticsearch.cli.Terminal;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import java.io.IOException;
import java.nio.file.DirectoryStream;
@ -33,7 +31,6 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* A command for the plugin cli to list plugins installed in elasticsearch.

View File

@ -69,8 +69,11 @@ public interface NetworkPlugin {
* See {@link org.elasticsearch.common.network.NetworkModule#HTTP_TYPE_SETTING} to configure a specific implementation.
*/
default Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry, NetworkService networkService) {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher) {
return Collections.emptyMap();
}
}

View File

@ -19,23 +19,35 @@
package org.elasticsearch.rest;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.UnaryOperator;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.path.PathTrie;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.elasticsearch.rest.RestStatus.OK;
public class RestController extends AbstractComponent {
@ -48,18 +60,27 @@ public class RestController extends AbstractComponent {
private final UnaryOperator<RestHandler> handlerWrapper;
private final NodeClient client;
private final CircuitBreakerService circuitBreakerService;
/** Rest headers that are copied to internal requests made during a rest request. */
private final Set<String> headersToCopy;
public RestController(Settings settings, Set<String> headersToCopy, UnaryOperator<RestHandler> handlerWrapper) {
public RestController(Settings settings, Set<String> headersToCopy, UnaryOperator<RestHandler> handlerWrapper,
NodeClient client, CircuitBreakerService circuitBreakerService) {
super(settings);
this.headersToCopy = headersToCopy;
if (handlerWrapper == null) {
handlerWrapper = h -> h; // passthrough if no wrapper set
}
this.handlerWrapper = handlerWrapper;
this.client = client;
this.circuitBreakerService = circuitBreakerService;
}
/**
* Registers a REST handler to be executed when the provided {@code method} and {@code path} match the request.
*
@ -137,7 +158,34 @@ public class RestController extends AbstractComponent {
return (handler != null) ? handler.canTripCircuitBreaker() : true;
}
public void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception {
public void dispatchRequest(RestRequest request, RestChannel channel, ThreadContext threadContext) {
if (request.rawPath().equals("/favicon.ico")) {
handleFavicon(request, channel);
return;
}
RestChannel responseChannel = channel;
try {
int contentLength = request.content().length();
if (canTripCircuitBreaker(request)) {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
} else {
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
}
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
dispatchRequest(request, responseChannel, client, threadContext);
} catch (Exception e) {
try {
responseChannel.sendResponse(new BytesRestResponse(channel, e));
} catch (Exception inner) {
inner.addSuppressed(e);
logger.error((Supplier<?>) () ->
new ParameterizedMessage("failed to send failure response for uri [{}]", request.uri()), inner);
}
}
}
void dispatchRequest(final RestRequest request, final RestChannel channel, final NodeClient client, ThreadContext threadContext) throws Exception {
if (!checkRequestParameters(request, channel)) {
return;
}
@ -223,4 +271,84 @@ public class RestController extends AbstractComponent {
// my_index/my_type/http%3A%2F%2Fwww.google.com
return request.rawPath();
}
void handleFavicon(RestRequest request, RestChannel channel) {
if (request.method() == RestRequest.Method.GET) {
try {
try (InputStream stream = getClass().getResourceAsStream("/config/favicon.ico")) {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Streams.copy(stream, out);
BytesRestResponse restResponse = new BytesRestResponse(RestStatus.OK, "image/x-icon", out.toByteArray());
channel.sendResponse(restResponse);
}
} catch (IOException e) {
channel.sendResponse(new BytesRestResponse(INTERNAL_SERVER_ERROR, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
} else {
channel.sendResponse(new BytesRestResponse(FORBIDDEN, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY));
}
}
private static final class ResourceHandlingHttpChannel implements RestChannel {
private final RestChannel delegate;
private final CircuitBreakerService circuitBreakerService;
private final int contentLength;
private final AtomicBoolean closed = new AtomicBoolean();
public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService, int contentLength) {
this.delegate = delegate;
this.circuitBreakerService = circuitBreakerService;
this.contentLength = contentLength;
}
@Override
public XContentBuilder newBuilder() throws IOException {
return delegate.newBuilder();
}
@Override
public XContentBuilder newErrorBuilder() throws IOException {
return delegate.newErrorBuilder();
}
@Override
public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException {
return delegate.newBuilder(autoDetectSource, useFiltering);
}
@Override
public BytesStreamOutput bytesOutput() {
return delegate.bytesOutput();
}
@Override
public RestRequest request() {
return delegate.request();
}
@Override
public boolean detailedErrorsEnabled() {
return delegate.detailedErrorsEnabled();
}
@Override
public void sendResponse(RestResponse response) {
close();
delegate.sendResponse(response);
}
private void close() {
// attempt to close once atomically
if (closed.compareAndSet(false, true) == false) {
throw new IllegalStateException("Channel is already closed");
}
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
}
}
private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) {
// We always obtain a fresh breaker to reflect changes to the breaker configuration.
return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
}
}

View File

@ -30,7 +30,6 @@ import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServerAdapter;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
@ -89,8 +88,6 @@ public class NetworkModuleTests extends ModuleTestCase {
public HttpStats stats() {
return null;
}
@Override
public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {}
}
@ -155,7 +152,8 @@ public class NetworkModuleTests extends ModuleTestCase {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService) {
NetworkService networkService,
HttpServerTransport.Dispatcher requestDispatcher) {
return Collections.singletonMap("custom", custom);
}
});
@ -195,7 +193,8 @@ public class NetworkModuleTests extends ModuleTestCase {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService) {
NetworkService networkService,
HttpServerTransport.Dispatcher requestDispatcher) {
Map<String, Supplier<HttpServerTransport>> supplierMap = new HashMap<>();
supplierMap.put("custom", custom);
supplierMap.put("default_custom", def);
@ -228,7 +227,8 @@ public class NetworkModuleTests extends ModuleTestCase {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService) {
NetworkService networkService,
HttpServerTransport.Dispatcher requestDispatcher) {
Map<String, Supplier<HttpServerTransport>> supplierMap = new HashMap<>();
supplierMap.put("custom", custom);
supplierMap.put("default_custom", def);
@ -276,6 +276,7 @@ public class NetworkModuleTests extends ModuleTestCase {
}
private NetworkModule newNetworkModule(Settings settings, boolean transportClient, NetworkPlugin... plugins) {
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null);
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null,
(a, b, c) -> {});
}
}

View File

@ -1,231 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import java.util.Collections;
import java.util.Map;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
public class HttpServerTests extends ESTestCase {
private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20);
private HttpServer httpServer;
private CircuitBreaker inFlightRequestsBreaker;
@Before
public void setup() {
Settings settings = Settings.EMPTY;
CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService(
Settings.builder()
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT)
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
// we can do this here only because we know that we don't adjust breaker settings dynamically in the test
inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
HttpServerTransport httpServerTransport = new TestHttpServerTransport();
RestController restController = new RestController(settings, Collections.emptySet(), null);
restController.registerHandler(RestRequest.Method.GET, "/",
(request, channel, client) -> channel.sendResponse(
new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel, client) -> {
throw new IllegalArgumentException("test error");
});
httpServer = new HttpServer(settings, httpServerTransport, restController, null, circuitBreakerService);
httpServer.start();
}
public void testDispatchRequestAddsAndFreesBytesOnSuccess() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAsciiOfLength(contentLength);
TestRestRequest request = new TestRestRequest("/", content);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK);
httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
public void testDispatchRequestAddsAndFreesBytesOnError() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAsciiOfLength(contentLength);
TestRestRequest request = new TestRestRequest("/error", content);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST);
httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAsciiOfLength(contentLength);
// we will produce an error in the rest handler and one more when sending the error response
TestRestRequest request = new TestRestRequest("/error", content);
ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true);
httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
public void testDispatchRequestLimitsBytes() {
int contentLength = BREAKER_LIMIT.bytesAsInt() + 1;
String content = randomAsciiOfLength(contentLength);
TestRestRequest request = new TestRestRequest("/", content);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE);
httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(1, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements
HttpServerTransport {
public TestHttpServerTransport() {
super(Settings.EMPTY);
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
@Override
public BoundTransportAddress boundAddress() {
TransportAddress transportAddress = buildNewFakeTransportAddress();
return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress);
}
@Override
public HttpInfo info() {
return null;
}
@Override
public HttpStats stats() {
return null;
}
@Override
public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
}
}
private static final class AssertingChannel extends AbstractRestChannel {
private final RestStatus expectedStatus;
protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) {
super(request, detailedErrorsEnabled);
this.expectedStatus = expectedStatus;
}
@Override
public void sendResponse(RestResponse response) {
assertEquals(expectedStatus, response.status());
}
}
private static final class ExceptionThrowingChannel extends AbstractRestChannel {
protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) {
super(request, detailedErrorsEnabled);
}
@Override
public void sendResponse(RestResponse response) {
throw new IllegalStateException("always throwing an exception for testing");
}
}
private static final class TestRestRequest extends RestRequest {
private final BytesReference content;
private TestRestRequest(String path, String content) {
super(NamedXContentRegistry.EMPTY, Collections.emptyMap(), path);
this.content = new BytesArray(content);
}
@Override
public Method method() {
return Method.GET;
}
@Override
public String uri() {
return null;
}
@Override
public boolean hasContent() {
return true;
}
@Override
public BytesReference content() {
return content;
}
@Override
public String header(String name) {
return null;
}
@Override
public Iterable<Map.Entry<String, String>> headers() {
return null;
}
}
}

View File

@ -22,7 +22,7 @@ package org.elasticsearch.ingest;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.env.Environment;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.test.ESTestCase;
import org.junit.After;
import org.junit.Before;

View File

@ -29,11 +29,26 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.UnaryOperator;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestRequest;
import org.junit.Before;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@ -43,10 +58,39 @@ import static org.mockito.Mockito.verify;
public class RestControllerTests extends ESTestCase {
private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20);
private CircuitBreaker inFlightRequestsBreaker;
private RestController restController;
private HierarchyCircuitBreakerService circuitBreakerService;
@Before
public void setup() {
Settings settings = Settings.EMPTY;
circuitBreakerService = new HierarchyCircuitBreakerService(
Settings.builder()
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT)
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
// we can do this here only because we know that we don't adjust breaker settings dynamically in the test
inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
HttpServerTransport httpServerTransport = new TestHttpServerTransport();
restController = new RestController(settings, Collections.emptySet(), null, null, circuitBreakerService);
restController.registerHandler(RestRequest.Method.GET, "/",
(request, channel, client) -> channel.sendResponse(
new BytesRestResponse(RestStatus.OK, BytesRestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)));
restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel, client) -> {
throw new IllegalArgumentException("test error");
});
httpServerTransport.start();
}
public void testApplyRelevantHeaders() throws Exception {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
Set<String> headers = new HashSet<>(Arrays.asList("header.1", "header.2"));
final RestController restController = new RestController(Settings.EMPTY, headers, null);
final RestController restController = new RestController(Settings.EMPTY, headers, null, null, circuitBreakerService);
restController.registerHandler(RestRequest.Method.GET, "/",
(RestRequest request, RestChannel channel, NodeClient client) -> {
assertEquals("true", threadContext.getHeader("header.1"));
@ -66,7 +110,7 @@ public class RestControllerTests extends ESTestCase {
}
public void testCanTripCircuitBreaker() throws Exception {
RestController controller = new RestController(Settings.EMPTY, Collections.emptySet(), null);
RestController controller = new RestController(Settings.EMPTY, Collections.emptySet(), null, null, circuitBreakerService);
// trip circuit breaker by default
controller.registerHandler(RestRequest.Method.GET, "/trip", new FakeRestHandler(true));
controller.registerHandler(RestRequest.Method.GET, "/do-not-trip", new FakeRestHandler(false));
@ -126,7 +170,8 @@ public class RestControllerTests extends ESTestCase {
assertSame(handler, h);
return (RestRequest request, RestChannel channel, NodeClient client) -> wrapperCalled.set(true);
};
final RestController restController = new RestController(Settings.EMPTY, Collections.emptySet(), wrapper);
final RestController restController = new RestController(Settings.EMPTY, Collections.emptySet(), wrapper, null,
circuitBreakerService);
restController.registerHandler(RestRequest.Method.GET, "/", handler);
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
restController.dispatchRequest(new FakeRestRequest.Builder(xContentRegistry()).build(), null, null, threadContext);
@ -154,4 +199,156 @@ public class RestControllerTests extends ESTestCase {
return canTripCircuitBreaker;
}
}
public void testDispatchRequestAddsAndFreesBytesOnSuccess() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAsciiOfLength(contentLength);
TestRestRequest request = new TestRestRequest("/", content);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK);
restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
public void testDispatchRequestAddsAndFreesBytesOnError() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAsciiOfLength(contentLength);
TestRestRequest request = new TestRestRequest("/error", content);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST);
restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAsciiOfLength(contentLength);
// we will produce an error in the rest handler and one more when sending the error response
TestRestRequest request = new TestRestRequest("/error", content);
ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true);
restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
public void testDispatchRequestLimitsBytes() {
int contentLength = BREAKER_LIMIT.bytesAsInt() + 1;
String content = randomAsciiOfLength(contentLength);
TestRestRequest request = new TestRestRequest("/", content);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE);
restController.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(1, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements
HttpServerTransport {
public TestHttpServerTransport() {
super(Settings.EMPTY);
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
@Override
public BoundTransportAddress boundAddress() {
TransportAddress transportAddress = buildNewFakeTransportAddress();
return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress);
}
@Override
public HttpInfo info() {
return null;
}
@Override
public HttpStats stats() {
return null;
}
}
private static final class AssertingChannel extends AbstractRestChannel {
private final RestStatus expectedStatus;
protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) {
super(request, detailedErrorsEnabled);
this.expectedStatus = expectedStatus;
}
@Override
public void sendResponse(RestResponse response) {
assertEquals(expectedStatus, response.status());
}
}
private static final class ExceptionThrowingChannel extends AbstractRestChannel {
protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) {
super(request, detailedErrorsEnabled);
}
@Override
public void sendResponse(RestResponse response) {
throw new IllegalStateException("always throwing an exception for testing");
}
}
private static final class TestRestRequest extends RestRequest {
private final BytesReference content;
private TestRestRequest(String path, String content) {
super(NamedXContentRegistry.EMPTY, Collections.emptyMap(), path);
this.content = new BytesArray(content);
}
@Override
public Method method() {
return Method.GET;
}
@Override
public String uri() {
return null;
}
@Override
public boolean hasContent() {
return true;
}
@Override
public BytesReference content() {
return content;
}
@Override
public String header(String name) {
return null;
}
@Override
public Iterable<Map.Entry<String, String>> headers() {
return null;
}
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.rest.action.admin.cluster;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
@ -43,7 +44,7 @@ public class RestNodesStatsActionTests extends ESTestCase {
@Override
public void setUp() throws Exception {
super.setUp();
action = new RestNodesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null));
action = new RestNodesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null, null, null));
}
public void testUnrecognizedMetric() throws IOException {

View File

@ -41,7 +41,7 @@ public class RestIndicesStatsActionTests extends ESTestCase {
@Override
public void setUp() throws Exception {
super.setUp();
action = new RestIndicesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null));
action = new RestIndicesStatsAction(Settings.EMPTY, new RestController(Settings.EMPTY, Collections.emptySet(), null, null, null));
}
public void testUnrecognizedMetric() throws IOException {

View File

@ -74,7 +74,7 @@ public class RestIndicesActionTests extends ESTestCase {
public void testBuildTable() {
final Settings settings = Settings.EMPTY;
final RestController restController = new RestController(settings, Collections.emptySet(), null);
final RestController restController = new RestController(settings, Collections.emptySet(), null, null, null);
final RestIndicesAction action = new RestIndicesAction(settings, restController, new IndexNameExpressionResolver(settings));
// build a (semi-)random table

View File

@ -50,7 +50,7 @@ public class RestRecoveryActionTests extends ESTestCase {
public void testRestRecoveryAction() {
final Settings settings = Settings.EMPTY;
final RestController restController = new RestController(settings, Collections.emptySet(), null);
final RestController restController = new RestController(settings, Collections.emptySet(), null, null, null);
final RestRecoveryAction action = new RestRecoveryAction(settings, restController, restController);
final int totalShards = randomIntBetween(1, 32);
final int successfulShards = Math.max(0, totalShards - randomIntBetween(1, 2));

View File

@ -63,7 +63,6 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpInfo;
import org.elasticsearch.http.HttpServerAdapter;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
@ -210,6 +209,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
protected final ByteSizeValue maxCumulationBufferCapacity;
protected final int maxCompositeBufferComponents;
private final Dispatcher dispatcher;
protected volatile ServerBootstrap serverBootstrap;
@ -220,17 +220,17 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
// package private for testing
Netty4OpenChannelsHandler serverOpenChannels;
protected volatile HttpServerAdapter httpServerAdapter;
private final Netty4CorsConfig corsConfig;
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry) {
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
super(settings);
this.networkService = networkService;
this.bigArrays = bigArrays;
this.threadPool = threadPool;
this.xContentRegistry = xContentRegistry;
this.dispatcher = dispatcher;
ByteSizeValue maxContentLength = SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings);
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
@ -286,11 +286,6 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
return this.settings;
}
@Override
public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
this.httpServerAdapter = httpServerAdapter;
}
@Override
protected void doStart() {
boolean success = false;
@ -331,6 +326,9 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
this.boundAddress = createBoundHttpAddress();
if (logger.isInfoEnabled()) {
logger.info("{}", boundAddress);
}
success = true;
} finally {
if (success == false) {
@ -511,7 +509,7 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
}
protected void dispatchRequest(RestRequest request, RestChannel channel) {
httpServerAdapter.dispatchRequest(request, channel, threadPool.getThreadContext());
dispatcher.dispatch(request, channel, threadPool.getThreadContext());
}
protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

View File

@ -93,9 +93,12 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
@Override
public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
CircuitBreakerService circuitBreakerService, NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry, NetworkService networkService) {
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher) {
return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME,
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry));
() -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher));
}
}

View File

@ -188,7 +188,8 @@ public class Netty4HttpChannelTests extends ESTestCase {
public void testHeadersSet() {
Settings settings = Settings.builder().build();
try (Netty4HttpServerTransport httpServerTransport =
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry())) {
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(),
(request, channel, context) -> {})) {
httpServerTransport.start();
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
httpRequest.headers().add(HttpHeaderNames.ORIGIN, "remote");
@ -218,7 +219,8 @@ public class Netty4HttpChannelTests extends ESTestCase {
public void testConnectionClose() throws Exception {
final Settings settings = Settings.builder().build();
try (Netty4HttpServerTransport httpServerTransport =
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry())) {
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(),
(request, channel, context) -> {})) {
httpServerTransport.start();
final FullHttpRequest httpRequest;
final boolean close = randomBoolean();
@ -253,7 +255,8 @@ public class Netty4HttpChannelTests extends ESTestCase {
private FullHttpResponse executeRequest(final Settings settings, final String originValue, final String host) {
// construct request and send it over the transport layer
try (Netty4HttpServerTransport httpServerTransport =
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry())) {
new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry(),
(request, channel, context) -> {})) {
httpServerTransport.start();
final FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
if (originValue != null) {

View File

@ -160,7 +160,7 @@ public class Netty4HttpServerPipeliningTests extends ESTestCase {
Netty4HttpServerPipeliningTests.this.networkService,
Netty4HttpServerPipeliningTests.this.bigArrays,
Netty4HttpServerPipeliningTests.this.threadPool,
xContentRegistry());
xContentRegistry(), (request, channel, context) -> {});
}
@Override

View File

@ -121,9 +121,8 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
*/
public void testExpectContinueHeader() throws Exception {
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool,
xContentRegistry())) {
transport.httpServerAdapter((request, channel, context) ->
channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))));
xContentRegistry(), (request, channel, context) ->
channel.sendResponse(new BytesRestResponse(OK, BytesRestResponse.TEXT_CONTENT_TYPE, new BytesArray("done"))))) {
transport.start();
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
@ -145,12 +144,12 @@ public class Netty4HttpServerTransportTests extends ESTestCase {
public void testBindUnavailableAddress() {
try (Netty4HttpServerTransport transport = new Netty4HttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool,
xContentRegistry())) {
xContentRegistry(), (request, channel, context) -> {})) {
transport.start();
TransportAddress remoteAddress = randomFrom(transport.boundAddress().boundAddresses());
Settings settings = Settings.builder().put("http.port", remoteAddress.getPort()).build();
try (Netty4HttpServerTransport otherTransport = new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool,
xContentRegistry())) {
xContentRegistry(), (request, channel, context) -> {})) {
BindHttpException bindHttpException = expectThrows(BindHttpException.class, () -> otherTransport.start());
assertEquals("Failed to bind to [" + remoteAddress.getPort() + "]", bindHttpException.getMessage());
}

View File

@ -29,18 +29,14 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.discovery.zen.UnicastHostsProvider;
import org.elasticsearch.discovery.zen.ZenPing;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.test.discovery.MockZenPing;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;

View File

@ -81,7 +81,7 @@ import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.node.internal.InternalSettingsPreparer;
import org.elasticsearch.node.InternalSettingsPreparer;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;

View File

@ -27,7 +27,6 @@ import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
@ -85,7 +84,7 @@ import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.node.MockNode;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeValidationException;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.node.NodeService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;