diff --git a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java index 8347e54d4bd..e700d301644 100644 --- a/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java +++ b/core/src/main/java/org/elasticsearch/common/breaker/CircuitBreakingException.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.RestStatus; import java.io.IOException; @@ -66,6 +67,11 @@ public class CircuitBreakingException extends ElasticsearchException { return this.byteLimit; } + @Override + public RestStatus status() { + return RestStatus.SERVICE_UNAVAILABLE; + } + @Override protected void innerToXContent(XContentBuilder builder, Params params) throws IOException { builder.field("bytes_wanted", bytesWanted); diff --git a/core/src/main/java/org/elasticsearch/http/HttpServer.java b/core/src/main/java/org/elasticsearch/http/HttpServer.java index 5ca565e4c88..45abad0fb81 100644 --- a/core/src/main/java/org/elasticsearch/http/HttpServer.java +++ b/core/src/main/java/org/elasticsearch/http/HttpServer.java @@ -19,22 +19,29 @@ package org.elasticsearch.http; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.Streams; +import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.env.Environment; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.node.service.NodeService; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestChannel; import org.elasticsearch.rest.RestController; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.rest.RestStatus.FORBIDDEN; import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; @@ -43,24 +50,22 @@ import static org.elasticsearch.rest.RestStatus.INTERNAL_SERVER_ERROR; * A component to serve http requests, backed by rest handlers. */ public class HttpServer extends AbstractLifecycleComponent implements HttpServerAdapter { - - private final Environment environment; - private final HttpServerTransport transport; private final RestController restController; private final NodeService nodeService; + private final CircuitBreakerService circuitBreakerService; + @Inject - public HttpServer(Settings settings, Environment environment, HttpServerTransport transport, - RestController restController, - NodeService nodeService) { + public HttpServer(Settings settings, HttpServerTransport transport, RestController restController, NodeService nodeService, + CircuitBreakerService circuitBreakerService) { super(settings); - this.environment = environment; this.transport = transport; this.restController = restController; this.nodeService = nodeService; + this.circuitBreakerService = circuitBreakerService; nodeService.setHttpServer(this); transport.httpServerAdapter(this); } @@ -99,7 +104,15 @@ public class HttpServer extends AbstractLifecycleComponent implement handleFavicon(request, channel); return; } - restController.dispatchRequest(request, channel, threadContext); + RestChannel responseChannel = channel; + try { + inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(request.content().length(), ""); + // iff we could reserve bytes for the request we need to send the response also over this channel + responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService); + restController.dispatchRequest(request, responseChannel, threadContext); + } catch (Throwable t) { + restController.sendErrorResponse(request, responseChannel, t); + } } void handleFavicon(RestRequest request, RestChannel channel) { @@ -118,4 +131,65 @@ public class HttpServer extends AbstractLifecycleComponent implement channel.sendResponse(new BytesRestResponse(FORBIDDEN)); } } + + private static final class ResourceHandlingHttpChannel implements RestChannel { + private final RestChannel delegate; + private final CircuitBreakerService circuitBreakerService; + private final AtomicBoolean closed = new AtomicBoolean(); + + public ResourceHandlingHttpChannel(RestChannel delegate, CircuitBreakerService circuitBreakerService) { + this.delegate = delegate; + this.circuitBreakerService = circuitBreakerService; + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return delegate.newBuilder(); + } + + @Override + public XContentBuilder newErrorBuilder() throws IOException { + return delegate.newErrorBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { + return delegate.newBuilder(autoDetectSource, useFiltering); + } + + @Override + public BytesStreamOutput bytesOutput() { + return delegate.bytesOutput(); + } + + @Override + public RestRequest request() { + return delegate.request(); + } + + @Override + public boolean detailedErrorsEnabled() { + return delegate.detailedErrorsEnabled(); + } + + @Override + public void sendResponse(RestResponse response) { + close(); + delegate.sendResponse(response); + } + + private void close() { + // attempt to close once atomically + if (closed.compareAndSet(false, true) == false) { + throw new IllegalStateException("Channel is already closed"); + } + inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-request().content().length()); + } + + } + + private static CircuitBreaker inFlightRequestsBreaker(CircuitBreakerService circuitBreakerService) { + // We always obtain a fresh breaker to reflect changes to the breaker configuration. + return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + } } diff --git a/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java b/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java index c140a3be6de..376ca738fab 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java +++ b/core/src/main/java/org/elasticsearch/http/netty/HttpRequestHandler.java @@ -61,11 +61,8 @@ public class HttpRequestHandler extends SimpleChannelUpstreamHandler { // the netty HTTP handling always copy over the buffer to its own buffer, either in NioWorker internally // when reading, or using a cumalation buffer NettyHttpRequest httpRequest = new NettyHttpRequest(request, e.getChannel()); - if (oue != null) { - serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled)); - } else { - serverTransport.dispatchRequest(httpRequest, new NettyHttpChannel(serverTransport, httpRequest, detailedErrorsEnabled)); - } + NettyHttpChannel channel = new NettyHttpChannel(serverTransport, httpRequest, oue, detailedErrorsEnabled); + serverTransport.dispatchRequest(httpRequest, channel); super.messageReceived(ctx, e); } diff --git a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java index cd45d259293..a634db247aa 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java +++ b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java @@ -19,6 +19,7 @@ package org.elasticsearch.http.netty; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; @@ -58,19 +59,22 @@ public final class NettyHttpChannel extends AbstractRestChannel { private final NettyHttpServerTransport transport; private final Channel channel; private final org.jboss.netty.handler.codec.http.HttpRequest nettyRequest; - private OrderedUpstreamMessageEvent orderedUpstreamMessageEvent = null; + private final OrderedUpstreamMessageEvent orderedUpstreamMessageEvent; + /** + * @param transport The corresponding NettyHttpServerTransport where this channel belongs to. + * @param request The request that is handled by this channel. + * @param orderedUpstreamMessageEvent If HTTP pipelining is enabled provide the corresponding Netty upstream event. May be null if + * HTTP pipelining is disabled. + * @param detailedErrorsEnabled true iff error messages should include stack traces. + */ public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, + @Nullable OrderedUpstreamMessageEvent orderedUpstreamMessageEvent, boolean detailedErrorsEnabled) { super(request, detailedErrorsEnabled); this.transport = transport; this.channel = request.getChannel(); this.nettyRequest = request.request(); - } - - public NettyHttpChannel(NettyHttpServerTransport transport, NettyHttpRequest request, - OrderedUpstreamMessageEvent orderedUpstreamMessageEvent, boolean detailedErrorsEnabled) { - this(transport, request, detailedErrorsEnabled); this.orderedUpstreamMessageEvent = orderedUpstreamMessageEvent; } diff --git a/core/src/main/java/org/elasticsearch/rest/RestController.java b/core/src/main/java/org/elasticsearch/rest/RestController.java index 0cbfdd0ef1b..6da1a929f7c 100644 --- a/core/src/main/java/org/elasticsearch/rest/RestController.java +++ b/core/src/main/java/org/elasticsearch/rest/RestController.java @@ -158,11 +158,11 @@ public class RestController extends AbstractLifecycleComponent { return new ControllerFilterChain(executionFilter); } - public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) { + public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) throws Exception { if (!checkRequestParameters(request, channel)) { return; } - try (ThreadContext.StoredContext t = threadContext.stashContext()){ + try (ThreadContext.StoredContext t = threadContext.stashContext()) { for (String key : relevantHeaders) { String httpHeader = request.header(key); if (httpHeader != null) { @@ -170,15 +170,7 @@ public class RestController extends AbstractLifecycleComponent { } } if (filters.length == 0) { - try { - executeHandler(request, channel); - } catch (Throwable e) { - try { - channel.sendResponse(new BytesRestResponse(channel, e)); - } catch (Throwable e1) { - logger.error("failed to send failure response for uri [{}]", e1, request.uri()); - } - } + executeHandler(request, channel); } else { ControllerFilterChain filterChain = new ControllerFilterChain(handlerFilter); filterChain.continueProcessing(request, channel); @@ -186,6 +178,14 @@ public class RestController extends AbstractLifecycleComponent { } } + public void sendErrorResponse(RestRequest request, RestChannel channel, Throwable e) { + try { + channel.sendResponse(new BytesRestResponse(channel, e)); + } catch (Throwable e1) { + logger.error("failed to send failure response for uri [{}]", e1, request.uri()); + } + } + /** * Checks the request parameters against enabled settings for error trace support * @return true if the request does not have any parameters that conflict with system settings diff --git a/core/src/test/java/org/elasticsearch/http/HttpServerTests.java b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java new file mode 100644 index 00000000000..28fc315d3e1 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/http/HttpServerTests.java @@ -0,0 +1,262 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http; + +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.breaker.CircuitBreaker; +import org.elasticsearch.common.bytes.ByteBufferBytesReference; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; +import org.elasticsearch.common.transport.LocalTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.indices.breaker.CircuitBreakerService; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.node.service.NodeService; +import org.elasticsearch.rest.AbstractRestChannel; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class HttpServerTests extends ESTestCase { + private static final ByteSizeValue BREAKER_LIMIT = new ByteSizeValue(20); + private HttpServer httpServer; + private CircuitBreaker inFlightRequestsBreaker; + + @Before + public void setup() { + Settings settings = Settings.EMPTY; + CircuitBreakerService circuitBreakerService = new HierarchyCircuitBreakerService( + Settings.builder() + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), BREAKER_LIMIT) + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); + // we can do this here only because we know that we don't adjust breaker settings dynamically in the test + inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS); + + HttpServerTransport httpServerTransport = new TestHttpServerTransport(); + RestController restController = new RestController(settings); + restController.registerHandler(RestRequest.Method.GET, "/", + (request, channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK))); + restController.registerHandler(RestRequest.Method.GET, "/error", (request, channel) -> { + throw new IllegalArgumentException("test error"); + }); + + ClusterService clusterService = new ClusterService(Settings.EMPTY, null, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, ClusterName.DEFAULT); + NodeService nodeService = new NodeService(Settings.EMPTY, null, null, null, null, null, null, null, null, null, + clusterService, null); + httpServer = new HttpServer(settings, httpServerTransport, restController, nodeService, circuitBreakerService); + httpServer.start(); + } + + public void testDispatchRequestAddsAndFreesBytesOnSuccess() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.OK); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestAddsAndFreesBytesOnError() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/error", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.BAD_REQUEST); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() { + int contentLength = BREAKER_LIMIT.bytesAsInt(); + String content = randomAsciiOfLength(contentLength); + // we will produce an error in the rest handler and one more when sending the error response + TestRestRequest request = new TestRestRequest("/error", content); + ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, true); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(0, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + public void testDispatchRequestLimitsBytes() { + int contentLength = BREAKER_LIMIT.bytesAsInt() + 1; + String content = randomAsciiOfLength(contentLength); + TestRestRequest request = new TestRestRequest("/", content); + AssertingChannel channel = new AssertingChannel(request, true, RestStatus.SERVICE_UNAVAILABLE); + + httpServer.dispatchRequest(request, channel, new ThreadContext(Settings.EMPTY)); + + assertEquals(1, inFlightRequestsBreaker.getTrippedCount()); + assertEquals(0, inFlightRequestsBreaker.getUsed()); + } + + private static final class TestHttpServerTransport extends AbstractLifecycleComponent implements + HttpServerTransport { + + public TestHttpServerTransport() { + super(Settings.EMPTY); + } + + @Override + protected void doStart() { + } + + @Override + protected void doStop() { + } + + @Override + protected void doClose() { + } + + @Override + public BoundTransportAddress boundAddress() { + LocalTransportAddress transportAddress = new LocalTransportAddress("1"); + return new BoundTransportAddress(new TransportAddress[] {transportAddress} ,transportAddress); + } + + @Override + public HttpInfo info() { + return null; + } + + @Override + public HttpStats stats() { + return null; + } + + @Override + public void httpServerAdapter(HttpServerAdapter httpServerAdapter) { + + } + } + + private static final class AssertingChannel extends AbstractRestChannel { + private final RestStatus expectedStatus; + + protected AssertingChannel(RestRequest request, boolean detailedErrorsEnabled, RestStatus expectedStatus) { + super(request, detailedErrorsEnabled); + this.expectedStatus = expectedStatus; + } + + @Override + public void sendResponse(RestResponse response) { + assertEquals(expectedStatus, response.status()); + } + } + + private static final class ExceptionThrowingChannel extends AbstractRestChannel { + + protected ExceptionThrowingChannel(RestRequest request, boolean detailedErrorsEnabled) { + super(request, detailedErrorsEnabled); + } + + @Override + public void sendResponse(RestResponse response) { + throw new IllegalStateException("always throwing an exception for testing"); + } + } + + private static final class TestRestRequest extends RestRequest { + private final String path; + private final BytesReference content; + + private TestRestRequest(String path, String content) { + this.path = path; + this.content = new ByteBufferBytesReference(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); + } + + @Override + public Method method() { + return Method.GET; + } + + @Override + public String uri() { + return null; + } + + @Override + public String rawPath() { + return path; + } + + @Override + public boolean hasContent() { + return true; + } + + @Override + public BytesReference content() { + return content; + } + + @Override + public String header(String name) { + return null; + } + + @Override + public Iterable> headers() { + return null; + } + + @Override + public boolean hasParam(String key) { + return false; + } + + @Override + public String param(String key) { + return null; + } + + @Override + public String param(String key, String defaultValue) { + return null; + } + + @Override + public Map params() { + return null; + } + } +} diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java index 883caf06a00..ce9051ad189 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpChannelTests.java @@ -342,6 +342,8 @@ public class NettyHttpChannelTests extends ESTestCase { private HttpHeaders headers = new DefaultHttpHeaders(); + private ChannelBuffer content = ChannelBuffers.EMPTY_BUFFER; + @Override public HttpMethod getMethod() { return null; @@ -379,12 +381,12 @@ public class NettyHttpChannelTests extends ESTestCase { @Override public ChannelBuffer getContent() { - return ChannelBuffers.EMPTY_BUFFER; + return content; } @Override public void setContent(ChannelBuffer content) { - + this.content = content; } @Override diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java index 139e1a0647d..4b04a6259c0 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpClient.java @@ -18,13 +18,17 @@ */ package org.elasticsearch.http.netty; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; +import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; @@ -33,6 +37,7 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; import org.jboss.netty.handler.codec.http.DefaultHttpRequest; import org.jboss.netty.handler.codec.http.HttpChunkAggregator; import org.jboss.netty.handler.codec.http.HttpClientCodec; +import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -76,9 +81,34 @@ public class NettyHttpClient implements Closeable { clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory());; } - public synchronized Collection sendRequests(SocketAddress remoteAddress, String... uris) throws InterruptedException { - final CountDownLatch latch = new CountDownLatch(uris.length); - final Collection content = Collections.synchronizedList(new ArrayList(uris.length)); + public Collection get(SocketAddress remoteAddress, String... uris) throws InterruptedException { + Collection requests = new ArrayList<>(uris.length); + for (int i = 0; i < uris.length; i++) { + final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); + httpRequest.headers().add(HOST, "localhost"); + httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); + requests.add(httpRequest); + } + return sendRequests(remoteAddress, requests); + } + + public Collection post(SocketAddress remoteAddress, Tuple... urisAndBodies) throws InterruptedException { + Collection requests = new ArrayList<>(urisAndBodies.length); + for (Tuple uriAndBody : urisAndBodies) { + ChannelBuffer content = ChannelBuffers.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8); + HttpRequest request = new DefaultHttpRequest(HTTP_1_1, HttpMethod.POST, uriAndBody.v1()); + request.headers().add(HOST, "localhost"); + request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes()); + request.setContent(content); + requests.add(request); + } + return sendRequests(remoteAddress, requests); + } + + private synchronized Collection sendRequests(SocketAddress remoteAddress, Collection requests) + throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(requests.size()); + final Collection content = Collections.synchronizedList(new ArrayList<>(requests.size())); clientBootstrap.setPipelineFactory(new CountDownLatchPipelineFactory(latch, content)); @@ -87,11 +117,8 @@ public class NettyHttpClient implements Closeable { channelFuture = clientBootstrap.connect(remoteAddress); channelFuture.await(1000); - for (int i = 0; i < uris.length; i++) { - final HttpRequest httpRequest = new DefaultHttpRequest(HTTP_1_1, HttpMethod.GET, uris[i]); - httpRequest.headers().add(HOST, "localhost"); - httpRequest.headers().add("X-Opaque-ID", String.valueOf(i)); - channelFuture.getChannel().write(httpRequest); + for (HttpRequest request : requests) { + channelFuture.getChannel().write(request); } latch.await(); diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java new file mode 100644 index 00000000000..71c0bcf3381 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpRequestSizeLimitIT.java @@ -0,0 +1,98 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.http.netty; + +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.http.HttpServerTransport; +import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collection; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; + +/** + * + */ +@ClusterScope(scope = Scope.TEST, numDataNodes = 1) +public class NettyHttpRequestSizeLimitIT extends ESIntegTestCase { + private static final ByteSizeValue LIMIT = new ByteSizeValue(1, ByteSizeUnit.KB); + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(NetworkModule.HTTP_ENABLED.getKey(), true) + .put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), LIMIT) + .build(); + } + + public void testLimitsInFlightRequests() throws Exception { + ensureGreen(); + + // we use the limit size as a (very) rough indication on how many requests we should sent to hit the limit + int numRequests = LIMIT.bytesAsInt() / 50; + + StringBuilder bulkRequest = new StringBuilder(); + for (int i = 0; i < numRequests; i++) { + bulkRequest.append("{\"index\": {}}"); + bulkRequest.append(System.lineSeparator()); + bulkRequest.append("{ \"field\" : \"value\" }"); + bulkRequest.append(System.lineSeparator()); + } + + Tuple[] requests = new Tuple[] { + Tuple.tuple("/index/type/_bulk", bulkRequest), + Tuple.tuple("/index/type/_bulk", bulkRequest), + Tuple.tuple("/index/type/_bulk", bulkRequest), + Tuple.tuple("/index/type/_bulk", bulkRequest) + }; + + HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress + ().boundAddresses()); + + try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { + Collection singleResponse = nettyHttpClient.post(inetSocketTransportAddress.address(), requests[0]); + assertThat(singleResponse, hasSize(1)); + assertAtLeastOnceExpectedStatus(singleResponse, HttpResponseStatus.OK); + + @SuppressWarnings("unchecked") + Collection multipleResponses = nettyHttpClient.post(inetSocketTransportAddress.address(), requests); + assertThat(multipleResponses, hasSize(requests.length)); + assertAtLeastOnceExpectedStatus(multipleResponses, HttpResponseStatus.SERVICE_UNAVAILABLE); + } + } + + private void assertAtLeastOnceExpectedStatus(Collection responses, HttpResponseStatus expectedStatus) { + long countResponseErrors = responses.stream().filter(r -> r.getStatus().equals(expectedStatus)).count(); + assertThat(countResponseErrors, greaterThan(0L)); + + } +} diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java index e63d45734ee..ece00173627 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyHttpServerPipeliningTests.java @@ -101,7 +101,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase { List requests = Arrays.asList("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast"); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{})); Collection responseBodies = returnHttpResponseBodies(responses); assertThat(responseBodies, contains("/firstfast", "/slow?sleep=500", "/secondfast", "/slow?sleep=1000", "/thirdfast")); } @@ -118,7 +118,7 @@ public class NettyHttpServerPipeliningTests extends ESTestCase { List requests = Arrays.asList("/slow?sleep=1000", "/firstfast", "/secondfast", "/thirdfast", "/slow?sleep=500"); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(transportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(transportAddress.address(), requests.toArray(new String[]{})); List responseBodies = new ArrayList<>(returnHttpResponseBodies(responses)); // we cannot be sure about the order of the fast requests, but the slow ones should have to be last assertThat(responseBodies, hasSize(5)); @@ -132,7 +132,9 @@ public class NettyHttpServerPipeliningTests extends ESTestCase { private final ExecutorService executorService; public CustomNettyHttpServerTransport(Settings settings) { - super(settings, NettyHttpServerPipeliningTests.this.networkService, NettyHttpServerPipeliningTests.this.bigArrays, NettyHttpServerPipeliningTests.this.threadPool); + super(settings, NettyHttpServerPipeliningTests.this.networkService, + NettyHttpServerPipeliningTests.this.bigArrays, NettyHttpServerPipeliningTests.this.threadPool + ); this.executorService = Executors.newFixedThreadPool(5); } diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java index 0097ddf2c24..9420f1de928 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java @@ -55,7 +55,7 @@ public class NettyPipeliningDisabledIT extends ESIntegTestCase { InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); assertThat(responses, hasSize(requests.size())); List opaqueIds = new ArrayList<>(returnOpaqueIds(responses)); diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java index b2f1b8cb592..1eccb946797 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java @@ -51,7 +51,7 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase { InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.sendRequests(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); assertThat(responses, hasSize(5)); Collection opaqueIds = returnOpaqueIds(responses); @@ -68,4 +68,4 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase { } } -} \ No newline at end of file +} diff --git a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java index d6e1a97ac8f..101d2d8e50d 100644 --- a/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java +++ b/core/src/test/java/org/elasticsearch/rest/RestControllerTests.java @@ -69,7 +69,7 @@ public class RestControllerTests extends ESTestCase { assertThat(relevantHeaders, equalTo(headersArray)); } - public void testApplyRelevantHeaders() { + public void testApplyRelevantHeaders() throws Exception { final ThreadContext threadContext = new ThreadContext(Settings.EMPTY); final RestController restController = new RestController(Settings.EMPTY) { @Override diff --git a/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java b/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java index eeb3d6fbbd7..51f36d1e25f 100644 --- a/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java +++ b/core/src/test/java/org/elasticsearch/rest/RestFilterChainTests.java @@ -19,30 +19,25 @@ package org.elasticsearch.rest; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.rest.FakeRestChannel; import org.elasticsearch.test.rest.FakeRestRequest; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.CoreMatchers.equalTo; public class RestFilterChainTests extends ESTestCase { - public void testRestFilters() throws InterruptedException { + public void testRestFilters() throws Exception { RestController restController = new RestController(Settings.EMPTY); @@ -84,7 +79,7 @@ public class RestFilterChainTests extends ESTestCase { }); FakeRestRequest fakeRestRequest = new FakeRestRequest(); - FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, 1); + FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, randomBoolean(), 1); restController.dispatchRequest(fakeRestRequest, fakeRestChannel, new ThreadContext(Settings.EMPTY)); assertThat(fakeRestChannel.await(), equalTo(true)); @@ -118,7 +113,7 @@ public class RestFilterChainTests extends ESTestCase { } } - public void testTooManyContinueProcessing() throws InterruptedException { + public void testTooManyContinueProcessing() throws Exception { final int additionalContinueCount = randomInt(10); @@ -142,65 +137,14 @@ public class RestFilterChainTests extends ESTestCase { }); FakeRestRequest fakeRestRequest = new FakeRestRequest(); - FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, additionalContinueCount + 1); + FakeRestChannel fakeRestChannel = new FakeRestChannel(fakeRestRequest, randomBoolean(), additionalContinueCount + 1); restController.dispatchRequest(fakeRestRequest, fakeRestChannel, new ThreadContext(Settings.EMPTY)); fakeRestChannel.await(); assertThat(testFilter.runs.get(), equalTo(1)); - assertThat(fakeRestChannel.responses.get(), equalTo(1)); - assertThat(fakeRestChannel.errors.get(), equalTo(additionalContinueCount)); - } - - private static class FakeRestChannel extends AbstractRestChannel { - - private final CountDownLatch latch; - AtomicInteger responses = new AtomicInteger(); - AtomicInteger errors = new AtomicInteger(); - - protected FakeRestChannel(RestRequest request, int responseCount) { - super(request, randomBoolean()); - this.latch = new CountDownLatch(responseCount); - } - - @Override - public XContentBuilder newBuilder() throws IOException { - return super.newBuilder(); - } - - @Override - public XContentBuilder newErrorBuilder() throws IOException { - return super.newErrorBuilder(); - } - - @Override - public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { - return super.newBuilder(autoDetectSource, useFiltering); - } - - @Override - protected BytesStreamOutput newBytesOutput() { - return super.newBytesOutput(); - } - - @Override - public RestRequest request() { - return super.request(); - } - - @Override - public void sendResponse(RestResponse response) { - if (response.status() == RestStatus.OK) { - responses.incrementAndGet(); - } else { - errors.incrementAndGet(); - } - latch.countDown(); - } - - public boolean await() throws InterruptedException { - return latch.await(10, TimeUnit.SECONDS); - } + assertThat(fakeRestChannel.responses().get(), equalTo(1)); + assertThat(fakeRestChannel.errors().get(), equalTo(additionalContinueCount)); } private static enum Operation implements Callback { diff --git a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java index 848c62ab2b4..4e8ea3b3eb0 100644 --- a/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java +++ b/core/src/test/java/org/elasticsearch/rest/action/cat/RestRecoveryActionTests.java @@ -35,7 +35,6 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.test.ESTestCase; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java new file mode 100644 index 00000000000..3d1ce291432 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestChannel.java @@ -0,0 +1,91 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.test.rest; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.rest.AbstractRestChannel; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public final class FakeRestChannel extends AbstractRestChannel { + private final CountDownLatch latch; + private final AtomicInteger responses = new AtomicInteger(); + private final AtomicInteger errors = new AtomicInteger(); + + public FakeRestChannel(RestRequest request, boolean detailedErrorsEnabled, int responseCount) { + super(request, detailedErrorsEnabled); + this.latch = new CountDownLatch(responseCount); + } + + @Override + public XContentBuilder newBuilder() throws IOException { + return super.newBuilder(); + } + + @Override + public XContentBuilder newErrorBuilder() throws IOException { + return super.newErrorBuilder(); + } + + @Override + public XContentBuilder newBuilder(@Nullable BytesReference autoDetectSource, boolean useFiltering) throws IOException { + return super.newBuilder(autoDetectSource, useFiltering); + } + + @Override + protected BytesStreamOutput newBytesOutput() { + return super.newBytesOutput(); + } + + @Override + public RestRequest request() { + return super.request(); + } + + @Override + public void sendResponse(RestResponse response) { + if (response.status() == RestStatus.OK) { + responses.incrementAndGet(); + } else { + errors.incrementAndGet(); + } + latch.countDown(); + } + + public boolean await() throws InterruptedException { + return latch.await(10, TimeUnit.SECONDS); + } + + public AtomicInteger responses() { + return responses; + } + + public AtomicInteger errors() { + return errors; + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java index 14075b5254b..66167bc7ec8 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java @@ -31,6 +31,8 @@ public class FakeRestRequest extends RestRequest { private final Map params; + private final BytesReference content; + public FakeRestRequest() { this(new HashMap<>()); } @@ -40,8 +42,13 @@ public class FakeRestRequest extends RestRequest { } public FakeRestRequest(Map headers, Map params) { + this(headers, params, null); + } + + public FakeRestRequest(Map headers, Map params, BytesReference content) { this.headers = headers; this.params = params; + this.content = content; } @Override @@ -61,12 +68,12 @@ public class FakeRestRequest extends RestRequest { @Override public boolean hasContent() { - return false; + return content != null; } @Override public BytesReference content() { - return null; + return content; } @Override