From 117bc68af3987004ade7e56f0aebace72cd1d0ef Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 13 Apr 2016 09:58:08 +0200 Subject: [PATCH] Limit request size on HTTP level With this commit we limit the size of all in-flight requests on HTTP level. The size is guarded by the same circuit breaker that is also used on transport level. Similarly, the size that is used is HTTP content length. Relates #16011 --- .../breaker/CircuitBreakingException.java | 6 + .../org/elasticsearch/http/HttpServer.java | 92 +++++- .../http/netty/HttpRequestHandler.java | 7 +- .../http/netty/NettyHttpChannel.java | 16 +- .../elasticsearch/rest/RestController.java | 22 +- .../elasticsearch/http/HttpServerTests.java | 262 ++++++++++++++++++ .../http/netty/NettyHttpChannelTests.java | 6 +- .../http/netty/NettyHttpClient.java | 43 ++- .../netty/NettyHttpRequestSizeLimitIT.java | 98 +++++++ .../netty/NettyHttpServerPipeliningTests.java | 8 +- .../http/netty/NettyPipeliningDisabledIT.java | 2 +- .../http/netty/NettyPipeliningEnabledIT.java | 4 +- .../rest/RestControllerTests.java | 2 +- .../rest/RestFilterChainTests.java | 70 +---- .../action/cat/RestRecoveryActionTests.java | 1 - .../test/rest/FakeRestChannel.java | 91 ++++++ .../test/rest/FakeRestRequest.java | 11 +- 17 files changed, 627 insertions(+), 114 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/http/HttpServerTests.java create mode 100644 core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java create mode 100644 test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java diff --git a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java index 8347e54d4bd..e700d301644 100644 --- a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java +++ b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -66,6 +67,11 @@ public class CircuitBreakingException extends ElasticsearchException { return this.byteLimit; } + @Override + public RestStatus status() { + return RestStatus.SERVICE_UNAVAILABLE; + } + @Override protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { builder.field("bytes_wanted", bytesWanted); diff --git a/core/src/main/java/org/elasticsearch/http/HttpServer.java b/core/src/main/java/org/elasticsearch/http/HttpServer.java index 5ca565e4c88..45abad0fb81 100644 --- a/core/src/main/java/org/elasticsearch/http/HttpServer.java +++ b/core/src/main/java/org/elasticsearch/http/HttpServer.java @@ -19,22 +19,29 @@ package org.elasticsearch.http; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.env.Environment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.rest.RestStatus.FORBIDDEN; import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; @@ -43,24 +50,22 @@ import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; * A component to serve http requests, backed by rest handlers. */ public class HttpServer extends AbstractLifecycleComponent implements HttpServerAdapter { - - private final Environment environment; - private final HttpServerTransport transport; private final RestController restController; private final NodeService nodeService; + private final CircuitBreakerService circuitBreakerService; + @Inject - public HttpServer(Settings settings, Environment environment, HttpServerTransport transport, - RestController restController, - NodeService nodeService) { + public HttpServer(Settings settings, HttpServerTransport transport, RestController restController, NodeService nodeService, + CircuitBreakerService circuitBreakerService) { super(settings); - this.environment = environment; this.transport = transport; this.restController = restController; this.nodeService = nodeService; + this.circuitBreakerService = circuitBreakerService; nodeService.setHttpServer(this); transport.httpServerAdapter(this); } @@ -99,7 +104,15 @@ public class HttpServer extends AbstractLifecycleComponent implement handleFavicon(request, channel); return; } - restController.dispatchRequest(request, channel, threadContext); + RestChannel responseChannel = channel; + try { + inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(request.content().length(), ""); + // iff we could reserve bytes for the request we need to send the response also over this channel + responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService); + restController.dispatchRequest(request, responseChannel, threadContext); + } catch (Throwable t) { + restController.sendErrorResponse(request, responseChannel, t); + } } void handleFavicon(RestRequest request, RestChannel channel) { @@ -118,4 +131,65 @@ public class HttpServer extends AbstractLifecycleComponent implement channel.sendResponse(new BytesRestResponse(FORBIDDEN)); } } + + private static final class ResourceHandlingHttpChannel implements RestChannel { + private final RestChannel delegate; + private final CircuitBreakerService circuitBreakerService; + private final AtomicBoolean closed = new AtomicBoolean(); + + public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService) { + this.delegate = delegate; + this.circuitBreakerService = circuitBreakerService; + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return delegate.newBuilder(); + } + + @Override + public XContentBuilder newErrorBuilder() throws IOException { + return delegate.newErrorBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { + return delegate.newBuilder(autoDetectSource, useFiltering); + } + + @Override + public BytesStreamOutput bytesOutput() { + return delegate.bytesOutput(); + } + + @Override + public RestRequest request() { + return delegate.request(); + } + + @Override + public boolean detailedErrorsEnabled() { + return delegate.detailedErrorsEnabled(); + } + + @Override + public void sendResponse(RestResponse response) { + close(); + delegate.sendResponse(response); + } + + private void close() { + // attempt to close once atomically + if (closed.compareAndSet(false, true) == false) { + throw new IllegalStateException("Channel is already closed"); + } + inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-request().content().length()); + } + + } + + private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) { + // We always obtain a fresh breaker to reflect changes to the breaker configuration. + return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + } } diff --git a/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java b/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java index c140a3be6de..376ca738fab 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java +++ b/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java @@ -61,11 +61,8 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler { // the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally // when reading, or using a cumalation buffer NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel()); - if (oue != null) { - serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled)); - } else { - serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, detailedErrorsEnabled)); - } + NettyHttpChannel channel = new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled); + serverTransport.dispatchRequest(httpRequest, channel); super.messageReceived(ctx, e); } diff --git a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java index cd45d259293..a634db247aa 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java +++ b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java @@ -19,6 +19,7 @@ package org.elasticsearch.http.netty; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; @@ -58,19 +59,22 @@ public final class NettyHttpChannel extends AbstractRestChannel { private final NettyHttpServerTransport transport; private final Channel channel; private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest; - private OrderedUpstreamMessageEvent orderedUpstreamMessageEvent = null; + private final OrderedUpstreamMessageEvent orderedUpstreamMessageEvent; + /** + * @param transport The corresponding NettyHttpServerTransport where this channel belongs to. + * @param request The request that is handled by this channel. + * @param orderedUpstreamMessageEvent If HTTP pipelining is enabled provide the corresponding Netty upstream event. May be null if + * HTTP pipelining is disabled. + * @param detailedErrorsEnabled true iff error messages should include stack traces. + */ public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, + @Nullable OrderedUpstreamMessageEvent orderedUpstreamMessageEvent, boolean detailedErrorsEnabled) { super(request, detailedErrorsEnabled); this.transport = transport; this.channel = request.getChannel(); this.nettyRequest = request.request(); - } - - public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, - OrderedUpstreamMessageEvent orderedUpstreamMessageEvent, boolean detailedErrorsEnabled) { - this(transport, request, detailedErrorsEnabled); this.orderedUpstreamMessageEvent = orderedUpstreamMessageEvent; } diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java index 0cbfdd0ef1b..6da1a929f7c 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestController.java +++ b/core/src/main/java/org/elasticsearch/rest/RestController.java @@ -158,11 +158,11 @@ public class RestController extends AbstractLifecycleComponent { return new ControllerFilterChain(executionFilter); } - public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) { + public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) throws Exception { if (!checkRequestParameters(request, channel)) { return; } - try (ThreadContext.StoredContext t = threadContext.stashContext()){ + try (ThreadContext.StoredContext t = threadContext.stashContext()) { for (String key : relevantHeaders) { String httpHeader = request.header(key); if (httpHeader != null) { @@ -170,15 +170,7 @@ public class RestController extends AbstractLifecycleComponent { } } if (filters.length == 0) { - try { - executeHandler(request, channel); - } catch (Throwable e) { - try { - channel.sendResponse(new BytesRestResponse(channel, e)); - } catch (Throwable e1) { - logger.error("failed to send failure response for uri [{}]", e1, request.uri()); - } - } + executeHandler(request, channel); } else { ControllerFilterChain filterChain = new ControllerFilterChain(handlerFilter); filterChain.continueProcessing(request, channel); @@ -186,6 +178,14 @@ public class RestController extends AbstractLifecycleComponent { } } + public void sendErrorResponse(RestRequest request, RestChannel channel, Throwable e) { + try { + channel.sendResponse(new BytesRestResponse(channel, e)); + } catch (Throwable e1) { + logger.error("failed to send failure response for uri [{}]", e1, request.uri()); + } + } + /** * Checks the request parameters against enabled settings for error trace support * @return true if the request does not have any parameters that conflict with system settings diff --git a/core/src/test/java/org/elasticsearch/http/HttpServerTests.java b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java new file mode 100644 index 00000000000..28fc315d3e1 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java @@ -0,0 +1,262 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.ByteBufferBytesReference; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.rest.AbstractRestChannel; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class HttpServerTests extends ESTestCase { + private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20); + private HttpServer httpServer; + private CircuitBreaker inFlightRequestsBreaker; + + @Before + public void setup() { + Settings settings = Settings.EMPTY; + CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService( + Settings.builder() + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT) + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + // we can do this here only because we know that we don't adjust breaker settings dynamically in the test + inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + + HttpServerTransport httpServerTransport = new TestHttpServerTransport(); + RestController restController = new RestController(settings); + restController.registerHandler(RestRequest.Method.GET, "/", + (request, channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK))); + restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel) -> { + throw new IllegalArgumentException("test error"); + }); + + ClusterService clusterService = new ClusterService(Settings.EMPTY, null, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, ClusterName.DEFAULT); + NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null, null, null, + clusterService, null); + httpServer = new HttpServer(settings, httpServerTransport, restController, nodeService, circuitBreakerService); + httpServer.start(); + } + + public void testDispatchRequestAddsAndFreesBytesOnSuccess() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestAddsAndFreesBytesOnError() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/error", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + // we will produce an error in the rest handler and one more when sending the error response + TestRestRequest request = new TestRestRequest("/error", content); + ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestLimitsBytes() { + int contentLength = BREAKER_LIMIT.bytesAsInt() + 1; + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(1, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements + HttpServerTransport { + + public TestHttpServerTransport() { + super(Settings.EMPTY); + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() { + } + + @Override + public BoundTransportAddress boundAddress() { + LocalTransportAddress transportAddress = new LocalTransportAddress("1"); + return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress); + } + + @Override + public HttpInfo info() { + return null; + } + + @Override + public HttpStats stats() { + return null; + } + + @Override + public void httpServerAdapter(HttpServerAdapter httpServerAdapter) { + + } + } + + private static final class AssertingChannel extends AbstractRestChannel { + private final RestStatus expectedStatus; + + protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) { + super(request, detailedErrorsEnabled); + this.expectedStatus = expectedStatus; + } + + @Override + public void sendResponse(RestResponse response) { + assertEquals(expectedStatus, response.status()); + } + } + + private static final class ExceptionThrowingChannel extends AbstractRestChannel { + + protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) { + super(request, detailedErrorsEnabled); + } + + @Override + public void sendResponse(RestResponse response) { + throw new IllegalStateException("always throwing an exception for testing"); + } + } + + private static final class TestRestRequest extends RestRequest { + private final String path; + private final BytesReference content; + + private TestRestRequest(String path, String content) { + this.path = path; + this.content = new ByteBufferBytesReference(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + } + + @Override + public Method method() { + return Method.GET; + } + + @Override + public String uri() { + return null; + } + + @Override + public String rawPath() { + return path; + } + + @Override + public boolean hasContent() { + return true; + } + + @Override + public BytesReference content() { + return content; + } + + @Override + public String header(String name) { + return null; + } + + @Override + public Iterable> headers() { + return null; + } + + @Override + public boolean hasParam(String key) { + return false; + } + + @Override + public String param(String key) { + return null; + } + + @Override + public String param(String key, String defaultValue) { + return null; + } + + @Override + public Map params() { + return null; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java index 883caf06a00..ce9051ad189 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java @@ -342,6 +342,8 @@ public class NettyHttpChannelTests extends ESTestCase { private HttpHeaders headers = new DefaultHttpHeaders(); + private ChannelBuffer content = ChannelBuffers.EMPTY_BUFFER; + @Override public HttpMethod getMethod() { return null; @@ -379,12 +381,12 @@ public class NettyHttpChannelTests extends ESTestCase { @Override public ChannelBuffer getContent() { - return ChannelBuffers.EMPTY_BUFFER; + return content; } @Override public void setContent(ChannelBuffer content) { - + this.content = content; } @Override diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java index 139e1a0647d..4b04a6259c0 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java @@ -18,13 +18,17 @@ */ package org.elasticsearch.http.netty; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; @@ -33,6 +37,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.http.DefaultHttpRequest; import org.jboss.netty.handler.codec.http.HttpChunkAggregator; import org.jboss.netty.handler.codec.http.HttpClientCodec; +import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -76,9 +81,34 @@ public class NettyHttpClient implements Closeable { clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());; } - public synchronized Collection sendRequests(SocketAddress remoteAddress, String... uris) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(uris.length); - final Collection content = Collections.synchronizedList(new ArrayList(uris.length)); + public Collection get(SocketAddress remoteAddress, String... uris) throws InterruptedException { + Collection requests = new ArrayList<>(uris.length); + for (int i = 0; i < uris.length; i++) { + final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); + requests.add(httpRequest); + } + return sendRequests(remoteAddress, requests); + } + + public Collection post(SocketAddress remoteAddress, Tuple... urisAndBodies) throws InterruptedException { + Collection requests = new ArrayList<>(urisAndBodies.length); + for (Tuple uriAndBody : urisAndBodies) { + ChannelBuffer content = ChannelBuffers.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8); + HttpRequest request = new DefaultHttpRequest(HTTP_1_1, HttpMethod.POST, uriAndBody.v1()); + request.headers().add(HOST, "localhost"); + request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes()); + request.setContent(content); + requests.add(request); + } + return sendRequests(remoteAddress, requests); + } + + private synchronized Collection sendRequests(SocketAddress remoteAddress, Collection requests) + throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(requests.size()); + final Collection content = Collections.synchronizedList(new ArrayList<>(requests.size())); clientBootstrap.setPipelineFactory(new CountDownLatchPipelineFactory(latch, content)); @@ -87,11 +117,8 @@ public class NettyHttpClient implements Closeable { channelFuture = clientBootstrap.connect(remoteAddress); channelFuture.await(1000); - for (int i = 0; i < uris.length; i++) { - final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); - httpRequest.headers().add(HOST, "localhost"); - httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); - channelFuture.getChannel().write(httpRequest); + for (HttpRequest request : requests) { + channelFuture.getChannel().write(request); } latch.await(); diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java new file mode 100644 index 00000000000..71c0bcf3381 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http.netty; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collection; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +/** + * + */ +@ClusterScope(scope = Scope.TEST, numDataNodes = 1) +public class NettyHttpRequestSizeLimitIT extends ESIntegTestCase { + private static final ByteSizeValue LIMIT = new ByteSizeValue(1, ByteSizeUnit.KB); + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(NetworkModule.HTTP_ENABLED.getKey(), true) + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT) + .build(); + } + + public void testLimitsInFlightRequests() throws Exception { + ensureGreen(); + + // we use the limit size as a (very) rough indication on how many requests we should sent to hit the limit + int numRequests = LIMIT.bytesAsInt() / 50; + + StringBuilder bulkRequest = new StringBuilder(); + for (int i = 0; i < numRequests; i++) { + bulkRequest.append("{\"index\": {}}"); + bulkRequest.append(System.lineSeparator()); + bulkRequest.append("{ \"field\" : \"value\" }"); + bulkRequest.append(System.lineSeparator()); + } + + Tuple[] requests = new Tuple[] { + Tuple.tuple("/index/type/_bulk", bulkRequest), + Tuple.tuple("/index/type/_bulk", bulkRequest), + Tuple.tuple("/index/type/_bulk", bulkRequest), + Tuple.tuple("/index/type/_bulk", bulkRequest) + }; + + HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress + ().boundAddresses()); + + try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { + Collection singleResponse = nettyHttpClient.post(inetSocketTransportAddress.address(), requests[0]); + assertThat(singleResponse, hasSize(1)); + assertAtLeastOnceExpectedStatus(singleResponse, HttpResponseStatus.OK); + + @SuppressWarnings("unchecked") + Collection multipleResponses = nettyHttpClient.post(inetSocketTransportAddress.address(), requests); + assertThat(multipleResponses, hasSize(requests.length)); + assertAtLeastOnceExpectedStatus(multipleResponses, HttpResponseStatus.SERVICE_UNAVAILABLE); + } + } + + private void assertAtLeastOnceExpectedStatus(Collection responses, HttpResponseStatus expectedStatus) { + long countResponseErrors = responses.stream().filter(r -> r.getStatus().equals(expectedStatus)).count(); + assertThat(countResponseErrors, greaterThan(0L)); + + } +} diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java index e63d45734ee..ece00173627 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java @@ -101,7 +101,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase { List requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast"); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{})); Collection responseBodies = returnHttpResponseBodies(responses); assertThat(responseBodies, contains("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast")); } @@ -118,7 +118,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase { List requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500"); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{})); List responseBodies = new ArrayList<>(returnHttpResponseBodies(responses)); // we cannot be sure about the order of the fast requests, but the slow ones should have to be last assertThat(responseBodies, hasSize(5)); @@ -132,7 +132,9 @@ public class NettyHttpServerPipeliningTests extends ESTestCase { private final ExecutorService executorService; public CustomNettyHttpServerTransport(Settings settings) { - super(settings, NettyHttpServerPipeliningTests.this.networkService, NettyHttpServerPipeliningTests.this.bigArrays, NettyHttpServerPipeliningTests.this.threadPool); + super(settings, NettyHttpServerPipeliningTests.this.networkService, + NettyHttpServerPipeliningTests.this.bigArrays, NettyHttpServerPipeliningTests.this.threadPool + ); this.executorService = Executors.newFixedThreadPool(5); } diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java index 0097ddf2c24..9420f1de928 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java @@ -55,7 +55,7 @@ public class NettyPipeliningDisabledIT extends ESIntegTestCase { InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); assertThat(responses, hasSize(requests.size())); List opaqueIds = new ArrayList<>(returnOpaqueIds(responses)); diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java index b2f1b8cb592..1eccb946797 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java @@ -51,7 +51,7 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase { InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); assertThat(responses, hasSize(5)); Collection opaqueIds = returnOpaqueIds(responses); @@ -68,4 +68,4 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase { } } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java index d6e1a97ac8f..101d2d8e50d 100644 --- a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -69,7 +69,7 @@ public class RestControllerTests extends ESTestCase { assertThat(relevantHeaders, equalTo(headersArray)); } - public void testApplyRelevantHeaders() { + public void testApplyRelevantHeaders() throws Exception { final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); final RestController restController = new RestController(Settings.EMPTY) { @Override diff --git a/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java b/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java index eeb3d6fbbd7..51f36d1e25f 100644 --- a/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java +++ b/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java @@ -19,30 +19,25 @@ package org.elasticsearch.rest; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestChannel; import org.elasticsearch.test.rest.FakeRestRequest; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; public class RestFilterChainTests extends ESTestCase { - public void testRestFilters() throws InterruptedException { + public void testRestFilters() throws Exception { RestController restController = new RestController(Settings.EMPTY); @@ -84,7 +79,7 @@ public class RestFilterChainTests extends ESTestCase { }); FakeRestRequest fakeRestRequest = new FakeRestRequest(); - FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, 1); + FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, randomBoolean(), 1); restController.dispatchRequest(fakeRestRequest, fakeRestChannel, new ThreadContext(Settings.EMPTY)); assertThat(fakeRestChannel.await(), equalTo(true)); @@ -118,7 +113,7 @@ public class RestFilterChainTests extends ESTestCase { } } - public void testTooManyContinueProcessing() throws InterruptedException { + public void testTooManyContinueProcessing() throws Exception { final int additionalContinueCount = randomInt(10); @@ -142,65 +137,14 @@ public class RestFilterChainTests extends ESTestCase { }); FakeRestRequest fakeRestRequest = new FakeRestRequest(); - FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, additionalContinueCount + 1); + FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, randomBoolean(), additionalContinueCount + 1); restController.dispatchRequest(fakeRestRequest, fakeRestChannel, new ThreadContext(Settings.EMPTY)); fakeRestChannel.await(); assertThat(testFilter.runs.get(), equalTo(1)); - assertThat(fakeRestChannel.responses.get(), equalTo(1)); - assertThat(fakeRestChannel.errors.get(), equalTo(additionalContinueCount)); - } - - private static class FakeRestChannel extends AbstractRestChannel { - - private final CountDownLatch latch; - AtomicInteger responses = new AtomicInteger(); - AtomicInteger errors = new AtomicInteger(); - - protected FakeRestChannel(RestRequest request, int responseCount) { - super(request, randomBoolean()); - this.latch = new CountDownLatch(responseCount); - } - - @Override - public XContentBuilder newBuilder() throws IOException { - return super.newBuilder(); - } - - @Override - public XContentBuilder newErrorBuilder() throws IOException { - return super.newErrorBuilder(); - } - - @Override - public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { - return super.newBuilder(autoDetectSource, useFiltering); - } - - @Override - protected BytesStreamOutput newBytesOutput() { - return super.newBytesOutput(); - } - - @Override - public RestRequest request() { - return super.request(); - } - - @Override - public void sendResponse(RestResponse response) { - if (response.status() == RestStatus.OK) { - responses.incrementAndGet(); - } else { - errors.incrementAndGet(); - } - latch.countDown(); - } - - public boolean await() throws InterruptedException { - return latch.await(10, TimeUnit.SECONDS); - } + assertThat(fakeRestChannel.responses().get(), equalTo(1)); + assertThat(fakeRestChannel.errors().get(), equalTo(additionalContinueCount)); } private static enum Operation implements Callback { diff --git a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java index 848c62ab2b4..4e8ea3b3eb0 100644 --- a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java +++ b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java new file mode 100644 index 00000000000..3d1ce291432 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.rest; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.AbstractRestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public final class FakeRestChannel extends AbstractRestChannel { + private final CountDownLatch latch; + private final AtomicInteger responses = new AtomicInteger(); + private final AtomicInteger errors = new AtomicInteger(); + + public FakeRestChannel(RestRequest request, boolean detailedErrorsEnabled, int responseCount) { + super(request, detailedErrorsEnabled); + this.latch = new CountDownLatch(responseCount); + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return super.newBuilder(); + } + + @Override + public XContentBuilder newErrorBuilder() throws IOException { + return super.newErrorBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { + return super.newBuilder(autoDetectSource, useFiltering); + } + + @Override + protected BytesStreamOutput newBytesOutput() { + return super.newBytesOutput(); + } + + @Override + public RestRequest request() { + return super.request(); + } + + @Override + public void sendResponse(RestResponse response) { + if (response.status() == RestStatus.OK) { + responses.incrementAndGet(); + } else { + errors.incrementAndGet(); + } + latch.countDown(); + } + + public boolean await() throws InterruptedException { + return latch.await(10, TimeUnit.SECONDS); + } + + public AtomicInteger responses() { + return responses; + } + + public AtomicInteger errors() { + return errors; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 14075b5254b..66167bc7ec8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -31,6 +31,8 @@ public class FakeRestRequest extends RestRequest { private final Map params; + private final BytesReference content; + public FakeRestRequest() { this(new HashMap<>()); } @@ -40,8 +42,13 @@ public class FakeRestRequest extends RestRequest { } public FakeRestRequest(Map headers, Map params) { + this(headers, params, null); + } + + public FakeRestRequest(Map headers, Map params, BytesReference content) { this.headers = headers; this.params = params; + this.content = content; } @Override @@ -61,12 +68,12 @@ public class FakeRestRequest extends RestRequest { @Override public boolean hasContent() { - return false; + return content != null; } @Override public BytesReference content() { - return null; + return content; } @Override