Limit request size on HTTP level

With this commit we limit the size of all in-flight requests on
HTTP level. The size is guarded by the same circuit breaker that
is also used on transport level. Similarly, the size that is used
is HTTP content length.

Relates #16011
This commit is contained in:
Daniel Mitterdorfer 2016-04-13 09:58:08 +02:00
parent 52b2016447
commit 117bc68af3
17 changed files with 627 additions and 114 deletions

View File

@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
@ -66,6 +67,11 @@ public class CircuitBreakingException extends ElasticsearchException {
return this.byteLimit;
}
@Override
public RestStatus status() {
return RestStatus.SERVICE_UNAVAILABLE;
}
@Override
protected void innerToXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("bytes_wanted", bytesWanted);

View File

@ -19,22 +19,29 @@
package org.elasticsearch.http;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.rest.RestStatus.FORBIDDEN;
import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
@ -43,24 +50,22 @@ import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR;
* A component to serve http requests, backed by rest handlers.
*/
public class HttpServer extends AbstractLifecycleComponent<HttpServer> implements HttpServerAdapter {
private final Environment environment;
private final HttpServerTransport transport;
private final RestController restController;
private final NodeService nodeService;
private final CircuitBreakerService circuitBreakerService;
@Inject
public HttpServer(Settings settings, Environment environment, HttpServerTransport transport,
RestController restController,
NodeService nodeService) {
public HttpServer(Settings settings, HttpServerTransport transport, RestController restController, NodeService nodeService,
CircuitBreakerService circuitBreakerService) {
super(settings);
this.environment = environment;
this.transport = transport;
this.restController = restController;
this.nodeService = nodeService;
this.circuitBreakerService = circuitBreakerService;
nodeService.setHttpServer(this);
transport.httpServerAdapter(this);
}
@ -99,7 +104,15 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> implement
handleFavicon(request, channel);
return;
}
restController.dispatchRequest(request, channel, threadContext);
RestChannel responseChannel = channel;
try {
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(request.content().length(), "<http_request>");
// iff we could reserve bytes for the request we need to send the response also over this channel
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService);
restController.dispatchRequest(request, responseChannel, threadContext);
} catch (Throwable t) {
restController.sendErrorResponse(request, responseChannel, t);
}
}
void handleFavicon(RestRequest request, RestChannel channel) {
@ -118,4 +131,65 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> implement
channel.sendResponse(new BytesRestResponse(FORBIDDEN));
}
}
private static final class ResourceHandlingHttpChannel implements RestChannel {
private final RestChannel delegate;
private final CircuitBreakerService circuitBreakerService;
private final AtomicBoolean closed = new AtomicBoolean();
public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService) {
this.delegate = delegate;
this.circuitBreakerService = circuitBreakerService;
}
@Override
public XContentBuilder newBuilder() throws IOException {
return delegate.newBuilder();
}
@Override
public XContentBuilder newErrorBuilder() throws IOException {
return delegate.newErrorBuilder();
}
@Override
public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException {
return delegate.newBuilder(autoDetectSource, useFiltering);
}
@Override
public BytesStreamOutput bytesOutput() {
return delegate.bytesOutput();
}
@Override
public RestRequest request() {
return delegate.request();
}
@Override
public boolean detailedErrorsEnabled() {
return delegate.detailedErrorsEnabled();
}
@Override
public void sendResponse(RestResponse response) {
close();
delegate.sendResponse(response);
}
private void close() {
// attempt to close once atomically
if (closed.compareAndSet(false, true) == false) {
throw new IllegalStateException("Channel is already closed");
}
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-request().content().length());
}
}
private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) {
// We always obtain a fresh breaker to reflect changes to the breaker configuration.
return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
}
}

View File

@ -61,11 +61,8 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
// the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally
// when reading, or using a cumalation buffer
NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel());
if (oue != null) {
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled));
} else {
serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, detailedErrorsEnabled));
}
NettyHttpChannel channel = new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled);
serverTransport.dispatchRequest(httpRequest, channel);
super.messageReceived(ctx, e);
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.http.netty;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
@ -58,19 +59,22 @@ public final class NettyHttpChannel extends AbstractRestChannel {
private final NettyHttpServerTransport transport;
private final Channel channel;
private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest;
private OrderedUpstreamMessageEvent orderedUpstreamMessageEvent = null;
private final OrderedUpstreamMessageEvent orderedUpstreamMessageEvent;
/**
* @param transport The corresponding <code>NettyHttpServerTransport</code> where this channel belongs to.
* @param request The request that is handled by this channel.
* @param orderedUpstreamMessageEvent If HTTP pipelining is enabled provide the corresponding Netty upstream event. May be null if
* HTTP pipelining is disabled.
* @param detailedErrorsEnabled true iff error messages should include stack traces.
*/
public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request,
@Nullable OrderedUpstreamMessageEvent orderedUpstreamMessageEvent,
boolean detailedErrorsEnabled) {
super(request, detailedErrorsEnabled);
this.transport = transport;
this.channel = request.getChannel();
this.nettyRequest = request.request();
}
public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request,
OrderedUpstreamMessageEvent orderedUpstreamMessageEvent, boolean detailedErrorsEnabled) {
this(transport, request, detailedErrorsEnabled);
this.orderedUpstreamMessageEvent = orderedUpstreamMessageEvent;
}

View File

@ -158,11 +158,11 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
return new ControllerFilterChain(executionFilter);
}
public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) {
public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) throws Exception {
if (!checkRequestParameters(request, channel)) {
return;
}
try (ThreadContext.StoredContext t = threadContext.stashContext()){
try (ThreadContext.StoredContext t = threadContext.stashContext()) {
for (String key : relevantHeaders) {
String httpHeader = request.header(key);
if (httpHeader != null) {
@ -170,15 +170,7 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
}
}
if (filters.length == 0) {
try {
executeHandler(request, channel);
} catch (Throwable e) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (Throwable e1) {
logger.error("failed to send failure response for uri [{}]", e1, request.uri());
}
}
executeHandler(request, channel);
} else {
ControllerFilterChain filterChain = new ControllerFilterChain(handlerFilter);
filterChain.continueProcessing(request, channel);
@ -186,6 +178,14 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
}
}
public void sendErrorResponse(RestRequest request, RestChannel channel, Throwable e) {
try {
channel.sendResponse(new BytesRestResponse(channel, e));
} catch (Throwable e1) {
logger.error("failed to send failure response for uri [{}]", e1, request.uri());
}
}
/**
* Checks the request parameters against enabled settings for error trace support
* @return true if the request does not have any parameters that conflict with system settings

View File

@ -0,0 +1,262 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.bytes.ByteBufferBytesReference;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.node.service.NodeService;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class HttpServerTests extends ESTestCase {
private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20);
private HttpServer httpServer;
private CircuitBreaker inFlightRequestsBreaker;
@Before
public void setup() {
Settings settings = Settings.EMPTY;
CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService(
Settings.builder()
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT)
.build(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
// we can do this here only because we know that we don't adjust breaker settings dynamically in the test
inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
HttpServerTransport httpServerTransport = new TestHttpServerTransport();
RestController restController = new RestController(settings);
restController.registerHandler(RestRequest.Method.GET, "/",
(request, channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK)));
restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel) -> {
throw new IllegalArgumentException("test error");
});
ClusterService clusterService = new ClusterService(Settings.EMPTY, null,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, ClusterName.DEFAULT);
NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null, null, null,
clusterService, null);
httpServer = new HttpServer(settings, httpServerTransport, restController, nodeService, circuitBreakerService);
httpServer.start();
}
public void testDispatchRequestAddsAndFreesBytesOnSuccess() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAsciiOfLength(contentLength);
TestRestRequest request = new TestRestRequest("/", content);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK);
httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
public void testDispatchRequestAddsAndFreesBytesOnError() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAsciiOfLength(contentLength);
TestRestRequest request = new TestRestRequest("/error", content);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST);
httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAsciiOfLength(contentLength);
// we will produce an error in the rest handler and one more when sending the error response
TestRestRequest request = new TestRestRequest("/error", content);
ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true);
httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
public void testDispatchRequestLimitsBytes() {
int contentLength = BREAKER_LIMIT.bytesAsInt() + 1;
String content = randomAsciiOfLength(contentLength);
TestRestRequest request = new TestRestRequest("/", content);
AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE);
httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY));
assertEquals(1, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
}
private static final class TestHttpServerTransport extends AbstractLifecycleComponent<HttpServerTransport> implements
HttpServerTransport {
public TestHttpServerTransport() {
super(Settings.EMPTY);
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
}
@Override
public BoundTransportAddress boundAddress() {
LocalTransportAddress transportAddress = new LocalTransportAddress("1");
return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress);
}
@Override
public HttpInfo info() {
return null;
}
@Override
public HttpStats stats() {
return null;
}
@Override
public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
}
}
private static final class AssertingChannel extends AbstractRestChannel {
private final RestStatus expectedStatus;
protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) {
super(request, detailedErrorsEnabled);
this.expectedStatus = expectedStatus;
}
@Override
public void sendResponse(RestResponse response) {
assertEquals(expectedStatus, response.status());
}
}
private static final class ExceptionThrowingChannel extends AbstractRestChannel {
protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) {
super(request, detailedErrorsEnabled);
}
@Override
public void sendResponse(RestResponse response) {
throw new IllegalStateException("always throwing an exception for testing");
}
}
private static final class TestRestRequest extends RestRequest {
private final String path;
private final BytesReference content;
private TestRestRequest(String path, String content) {
this.path = path;
this.content = new ByteBufferBytesReference(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)));
}
@Override
public Method method() {
return Method.GET;
}
@Override
public String uri() {
return null;
}
@Override
public String rawPath() {
return path;
}
@Override
public boolean hasContent() {
return true;
}
@Override
public BytesReference content() {
return content;
}
@Override
public String header(String name) {
return null;
}
@Override
public Iterable<Map.Entry<String, String>> headers() {
return null;
}
@Override
public boolean hasParam(String key) {
return false;
}
@Override
public String param(String key) {
return null;
}
@Override
public String param(String key, String defaultValue) {
return null;
}
@Override
public Map<String, String> params() {
return null;
}
}
}

View File

@ -342,6 +342,8 @@ public class NettyHttpChannelTests extends ESTestCase {
private HttpHeaders headers = new DefaultHttpHeaders();
private ChannelBuffer content = ChannelBuffers.EMPTY_BUFFER;
@Override
public HttpMethod getMethod() {
return null;
@ -379,12 +381,12 @@ public class NettyHttpChannelTests extends ESTestCase {
@Override
public ChannelBuffer getContent() {
return ChannelBuffers.EMPTY_BUFFER;
return content;
}
@Override
public void setContent(ChannelBuffer content) {
this.content = content;
}
@Override

View File

@ -18,13 +18,17 @@
*/
package org.elasticsearch.http.netty;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
@ -33,6 +37,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.http.DefaultHttpRequest;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpClientCodec;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
@ -76,9 +81,34 @@ public class NettyHttpClient implements Closeable {
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());;
}
public synchronized Collection<HttpResponse> sendRequests(SocketAddress remoteAddress, String... uris) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(uris.length);
final Collection<HttpResponse> content = Collections.synchronizedList(new ArrayList<HttpResponse>(uris.length));
public Collection<HttpResponse> get(SocketAddress remoteAddress, String... uris) throws InterruptedException {
Collection<HttpRequest> requests = new ArrayList<>(uris.length);
for (int i = 0; i < uris.length; i++) {
final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
httpRequest.headers().add(HOST, "localhost");
httpRequest.headers().add("X-Opaque-ID", String.valueOf(i));
requests.add(httpRequest);
}
return sendRequests(remoteAddress, requests);
}
public Collection<HttpResponse> post(SocketAddress remoteAddress, Tuple<String, String>... urisAndBodies) throws InterruptedException {
Collection<HttpRequest> requests = new ArrayList<>(urisAndBodies.length);
for (Tuple<String, String> uriAndBody : urisAndBodies) {
ChannelBuffer content = ChannelBuffers.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8);
HttpRequest request = new DefaultHttpRequest(HTTP_1_1, HttpMethod.POST, uriAndBody.v1());
request.headers().add(HOST, "localhost");
request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes());
request.setContent(content);
requests.add(request);
}
return sendRequests(remoteAddress, requests);
}
private synchronized Collection<HttpResponse> sendRequests(SocketAddress remoteAddress, Collection<HttpRequest> requests)
throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(requests.size());
final Collection<HttpResponse> content = Collections.synchronizedList(new ArrayList<>(requests.size()));
clientBootstrap.setPipelineFactory(new CountDownLatchPipelineFactory(latch, content));
@ -87,11 +117,8 @@ public class NettyHttpClient implements Closeable {
channelFuture = clientBootstrap.connect(remoteAddress);
channelFuture.await(1000);
for (int i = 0; i < uris.length; i++) {
final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]);
httpRequest.headers().add(HOST, "localhost");
httpRequest.headers().add("X-Opaque-ID", String.valueOf(i));
channelFuture.getChannel().write(httpRequest);
for (HttpRequest request : requests) {
channelFuture.getChannel().write(request);
}
latch.await();

View File

@ -0,0 +1,98 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.util.Collection;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
/**
*
*/
@ClusterScope(scope = Scope.TEST, numDataNodes = 1)
public class NettyHttpRequestSizeLimitIT extends ESIntegTestCase {
private static final ByteSizeValue LIMIT = new ByteSizeValue(1, ByteSizeUnit.KB);
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(NetworkModule.HTTP_ENABLED.getKey(), true)
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT)
.build();
}
public void testLimitsInFlightRequests() throws Exception {
ensureGreen();
// we use the limit size as a (very) rough indication on how many requests we should sent to hit the limit
int numRequests = LIMIT.bytesAsInt() / 50;
StringBuilder bulkRequest = new StringBuilder();
for (int i = 0; i < numRequests; i++) {
bulkRequest.append("{\"index\": {}}");
bulkRequest.append(System.lineSeparator());
bulkRequest.append("{ \"field\" : \"value\" }");
bulkRequest.append(System.lineSeparator());
}
Tuple[] requests = new Tuple[] {
Tuple.tuple("/index/type/_bulk", bulkRequest),
Tuple.tuple("/index/type/_bulk", bulkRequest),
Tuple.tuple("/index/type/_bulk", bulkRequest),
Tuple.tuple("/index/type/_bulk", bulkRequest)
};
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress
().boundAddresses());
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
Collection<HttpResponse> singleResponse = nettyHttpClient.post(inetSocketTransportAddress.address(), requests[0]);
assertThat(singleResponse, hasSize(1));
assertAtLeastOnceExpectedStatus(singleResponse, HttpResponseStatus.OK);
@SuppressWarnings("unchecked")
Collection<HttpResponse> multipleResponses = nettyHttpClient.post(inetSocketTransportAddress.address(), requests);
assertThat(multipleResponses, hasSize(requests.length));
assertAtLeastOnceExpectedStatus(multipleResponses, HttpResponseStatus.SERVICE_UNAVAILABLE);
}
}
private void assertAtLeastOnceExpectedStatus(Collection<HttpResponse> responses, HttpResponseStatus expectedStatus) {
long countResponseErrors = responses.stream().filter(r -> r.getStatus().equals(expectedStatus)).count();
assertThat(countResponseErrors, greaterThan(0L));
}
}

View File

@ -101,7 +101,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
List<String> requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast");
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(transportAddress.address(), requests.toArray(new String[]{}));
Collection<HttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{}));
Collection<String> responseBodies = returnHttpResponseBodies(responses);
assertThat(responseBodies, contains("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast"));
}
@ -118,7 +118,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
List<String> requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500");
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(transportAddress.address(), requests.toArray(new String[]{}));
Collection<HttpResponse> responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{}));
List<String> responseBodies = new ArrayList<>(returnHttpResponseBodies(responses));
// we cannot be sure about the order of the fast requests, but the slow ones should have to be last
assertThat(responseBodies, hasSize(5));
@ -132,7 +132,9 @@ public class NettyHttpServerPipeliningTests extends ESTestCase {
private final ExecutorService executorService;
public CustomNettyHttpServerTransport(Settings settings) {
super(settings, NettyHttpServerPipeliningTests.this.networkService, NettyHttpServerPipeliningTests.this.bigArrays, NettyHttpServerPipeliningTests.this.threadPool);
super(settings, NettyHttpServerPipeliningTests.this.networkService,
NettyHttpServerPipeliningTests.this.bigArrays, NettyHttpServerPipeliningTests.this.threadPool
);
this.executorService = Executors.newFixedThreadPool(5);
}

View File

@ -55,7 +55,7 @@ public class NettyPipeliningDisabledIT extends ESIntegTestCase {
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
assertThat(responses, hasSize(requests.size()));
List<String> opaqueIds = new ArrayList<>(returnOpaqueIds(responses));

View File

@ -51,7 +51,7 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase {
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
Collection<HttpResponse> responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
assertThat(responses, hasSize(5));
Collection<String> opaqueIds = returnOpaqueIds(responses);
@ -68,4 +68,4 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase {
}
}
}
}

View File

@ -69,7 +69,7 @@ public class RestControllerTests extends ESTestCase {
assertThat(relevantHeaders, equalTo(headersArray));
}
public void testApplyRelevantHeaders() {
public void testApplyRelevantHeaders() throws Exception {
final ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
final RestController restController = new RestController(Settings.EMPTY) {
@Override

View File

@ -19,30 +19,25 @@
package org.elasticsearch.rest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.rest.FakeRestChannel;
import org.elasticsearch.test.rest.FakeRestRequest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.CoreMatchers.equalTo;
public class RestFilterChainTests extends ESTestCase {
public void testRestFilters() throws InterruptedException {
public void testRestFilters() throws Exception {
RestController restController = new RestController(Settings.EMPTY);
@ -84,7 +79,7 @@ public class RestFilterChainTests extends ESTestCase {
});
FakeRestRequest fakeRestRequest = new FakeRestRequest();
FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, 1);
FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, randomBoolean(), 1);
restController.dispatchRequest(fakeRestRequest, fakeRestChannel, new ThreadContext(Settings.EMPTY));
assertThat(fakeRestChannel.await(), equalTo(true));
@ -118,7 +113,7 @@ public class RestFilterChainTests extends ESTestCase {
}
}
public void testTooManyContinueProcessing() throws InterruptedException {
public void testTooManyContinueProcessing() throws Exception {
final int additionalContinueCount = randomInt(10);
@ -142,65 +137,14 @@ public class RestFilterChainTests extends ESTestCase {
});
FakeRestRequest fakeRestRequest = new FakeRestRequest();
FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, additionalContinueCount + 1);
FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, randomBoolean(), additionalContinueCount + 1);
restController.dispatchRequest(fakeRestRequest, fakeRestChannel, new ThreadContext(Settings.EMPTY));
fakeRestChannel.await();
assertThat(testFilter.runs.get(), equalTo(1));
assertThat(fakeRestChannel.responses.get(), equalTo(1));
assertThat(fakeRestChannel.errors.get(), equalTo(additionalContinueCount));
}
private static class FakeRestChannel extends AbstractRestChannel {
private final CountDownLatch latch;
AtomicInteger responses = new AtomicInteger();
AtomicInteger errors = new AtomicInteger();
protected FakeRestChannel(RestRequest request, int responseCount) {
super(request, randomBoolean());
this.latch = new CountDownLatch(responseCount);
}
@Override
public XContentBuilder newBuilder() throws IOException {
return super.newBuilder();
}
@Override
public XContentBuilder newErrorBuilder() throws IOException {
return super.newErrorBuilder();
}
@Override
public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException {
return super.newBuilder(autoDetectSource, useFiltering);
}
@Override
protected BytesStreamOutput newBytesOutput() {
return super.newBytesOutput();
}
@Override
public RestRequest request() {
return super.request();
}
@Override
public void sendResponse(RestResponse response) {
if (response.status() == RestStatus.OK) {
responses.incrementAndGet();
} else {
errors.incrementAndGet();
}
latch.countDown();
}
public boolean await() throws InterruptedException {
return latch.await(10, TimeUnit.SECONDS);
}
assertThat(fakeRestChannel.responses().get(), equalTo(1));
assertThat(fakeRestChannel.errors().get(), equalTo(additionalContinueCount));
}
private static enum Operation implements Callback {

View File

@ -35,7 +35,6 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.test.ESTestCase;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;

View File

@ -0,0 +1,91 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.rest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public final class FakeRestChannel extends AbstractRestChannel {
private final CountDownLatch latch;
private final AtomicInteger responses = new AtomicInteger();
private final AtomicInteger errors = new AtomicInteger();
public FakeRestChannel(RestRequest request, boolean detailedErrorsEnabled, int responseCount) {
super(request, detailedErrorsEnabled);
this.latch = new CountDownLatch(responseCount);
}
@Override
public XContentBuilder newBuilder() throws IOException {
return super.newBuilder();
}
@Override
public XContentBuilder newErrorBuilder() throws IOException {
return super.newErrorBuilder();
}
@Override
public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException {
return super.newBuilder(autoDetectSource, useFiltering);
}
@Override
protected BytesStreamOutput newBytesOutput() {
return super.newBytesOutput();
}
@Override
public RestRequest request() {
return super.request();
}
@Override
public void sendResponse(RestResponse response) {
if (response.status() == RestStatus.OK) {
responses.incrementAndGet();
} else {
errors.incrementAndGet();
}
latch.countDown();
}
public boolean await() throws InterruptedException {
return latch.await(10, TimeUnit.SECONDS);
}
public AtomicInteger responses() {
return responses;
}
public AtomicInteger errors() {
return errors;
}
}

View File

@ -31,6 +31,8 @@ public class FakeRestRequest extends RestRequest {
private final Map<String, String> params;
private final BytesReference content;
public FakeRestRequest() {
this(new HashMap<>());
}
@ -40,8 +42,13 @@ public class FakeRestRequest extends RestRequest {
}
public FakeRestRequest(Map<String, String> headers, Map<String, String> params) {
this(headers, params, null);
}
public FakeRestRequest(Map<String, String> headers, Map<String, String> params, BytesReference content) {
this.headers = headers;
this.params = params;
this.content = content;
}
@Override
@ -61,12 +68,12 @@ public class FakeRestRequest extends RestRequest {
@Override
public boolean hasContent() {
return false;
return content != null;
}
@Override
public BytesReference content() {
return null;
return content;
}
@Override