Exclude admin / diagnostic requests from HTTP request limiting
With this commit we exclude certain HTTP requests that are needed to inspect the cluster from HTTP request limiting to ensure these commands are processed even in critical memory conditions. Relates #17951, relates #18145, closes #18833
This commit is contained in:
parent
a2ad5c0282
commit
f32b700472
|
@ -108,7 +108,11 @@ public class HttpServer extends AbstractLifecycleComponent<HttpServer> implement
|
|||
RestChannel responseChannel = channel;
|
||||
try {
|
||||
int contentLength = request.content().length();
|
||||
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
|
||||
if (restController.canTripCircuitBreaker(request)) {
|
||||
inFlightRequestsBreaker(circuitBreakerService).addEstimateBytesAndMaybeBreak(contentLength, "<http_request>");
|
||||
} else {
|
||||
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(contentLength);
|
||||
}
|
||||
// iff we could reserve bytes for the request we need to send the response also over this channel
|
||||
responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength);
|
||||
restController.dispatchRequest(request, responseChannel, threadContext);
|
||||
|
|
|
@ -113,30 +113,14 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Registers a rest handler to be execute when the provided method and path match the request.
|
||||
* Registers a rest handler to be executed when the provided method and path match the request.
|
||||
*/
|
||||
public void registerHandler(RestRequest.Method method, String path, RestHandler handler) {
|
||||
switch (method) {
|
||||
case GET:
|
||||
getHandlers.insert(path, handler);
|
||||
break;
|
||||
case DELETE:
|
||||
deleteHandlers.insert(path, handler);
|
||||
break;
|
||||
case POST:
|
||||
postHandlers.insert(path, handler);
|
||||
break;
|
||||
case PUT:
|
||||
putHandlers.insert(path, handler);
|
||||
break;
|
||||
case OPTIONS:
|
||||
optionsHandlers.insert(path, handler);
|
||||
break;
|
||||
case HEAD:
|
||||
headHandlers.insert(path, handler);
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
|
||||
PathTrie<RestHandler> handlers = getHandlersForMethod(method);
|
||||
if (handlers != null) {
|
||||
handlers.insert(path, handler);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Can't handle [" + method + "] for path [" + path + "]");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -159,6 +143,15 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
|
|||
return new ControllerFilterChain(executionFilter);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param request The current request. Must not be null.
|
||||
* @return true iff the circuit breaker limit must be enforced for processing this request.
|
||||
*/
|
||||
public boolean canTripCircuitBreaker(RestRequest request) {
|
||||
RestHandler handler = getHandler(request);
|
||||
return (handler != null) ? handler.canTripCircuitBreaker() : true;
|
||||
}
|
||||
|
||||
public void dispatchRequest(final RestRequest request, final RestChannel channel, ThreadContext threadContext) throws Exception {
|
||||
if (!checkRequestParameters(request, channel)) {
|
||||
return;
|
||||
|
@ -226,19 +219,27 @@ public class RestController extends AbstractLifecycleComponent<RestController> {
|
|||
|
||||
private RestHandler getHandler(RestRequest request) {
|
||||
String path = getPath(request);
|
||||
RestRequest.Method method = request.method();
|
||||
PathTrie<RestHandler> handlers = getHandlersForMethod(request.method());
|
||||
if (handlers != null) {
|
||||
return handlers.retrieve(path, request.params());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private PathTrie<RestHandler> getHandlersForMethod(RestRequest.Method method) {
|
||||
if (method == RestRequest.Method.GET) {
|
||||
return getHandlers.retrieve(path, request.params());
|
||||
return getHandlers;
|
||||
} else if (method == RestRequest.Method.POST) {
|
||||
return postHandlers.retrieve(path, request.params());
|
||||
return postHandlers;
|
||||
} else if (method == RestRequest.Method.PUT) {
|
||||
return putHandlers.retrieve(path, request.params());
|
||||
return putHandlers;
|
||||
} else if (method == RestRequest.Method.DELETE) {
|
||||
return deleteHandlers.retrieve(path, request.params());
|
||||
return deleteHandlers;
|
||||
} else if (method == RestRequest.Method.HEAD) {
|
||||
return headHandlers.retrieve(path, request.params());
|
||||
return headHandlers;
|
||||
} else if (method == RestRequest.Method.OPTIONS) {
|
||||
return optionsHandlers.retrieve(path, request.params());
|
||||
return optionsHandlers;
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -25,4 +25,8 @@ package org.elasticsearch.rest;
|
|||
public interface RestHandler {
|
||||
|
||||
void handleRequest(RestRequest request, RestChannel channel) throws Exception;
|
||||
}
|
||||
|
||||
default boolean canTripCircuitBreaker() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,4 +64,9 @@ public class RestClusterHealthAction extends BaseRestHandler {
|
|||
clusterHealthRequest.waitForNodes(request.param("wait_for_nodes", clusterHealthRequest.waitForNodes()));
|
||||
client.admin().cluster().health(clusterHealthRequest, new RestStatusToXContentListener<ClusterHealthResponse>(channel));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,4 +78,9 @@ public class RestNodesHotThreadsAction extends BaseRestHandler {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -103,4 +103,9 @@ public class RestNodesInfoAction extends BaseRestHandler {
|
|||
|
||||
client.admin().cluster().nodesInfo(nodesInfoRequest, new NodesResponseRestListener<>(channel));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -115,4 +115,9 @@ public class RestNodesStatsAction extends BaseRestHandler {
|
|||
|
||||
client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,4 +64,9 @@ public class RestCancelTasksAction extends BaseRestHandler {
|
|||
ActionListener<CancelTasksResponse> listener = nodeSettingListener(clusterService, new RestToXContentListener<>(channel));
|
||||
client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,4 +91,9 @@ public class RestListTasksAction extends BaseRestHandler {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,6 +72,11 @@ public class RestClusterGetSettingsAction extends BaseRestHandler {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
|
||||
private XContentBuilder renderResponse(ClusterState state, boolean renderDefaults, XContentBuilder builder, ToXContent.Params params) throws IOException {
|
||||
builder.startObject();
|
||||
|
||||
|
|
|
@ -76,4 +76,9 @@ public class RestClusterUpdateSettingsAction extends BaseRestHandler {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -96,6 +96,11 @@ public class RestClusterStateAction extends BaseRestHandler {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
|
||||
static final class Fields {
|
||||
static final String CLUSTER_NAME = "cluster_name";
|
||||
}
|
||||
|
|
|
@ -47,4 +47,9 @@ public class RestClusterStatsAction extends BaseRestHandler {
|
|||
clusterStatsRequest.timeout(request.param("timeout"));
|
||||
client.admin().cluster().clusterStats(clusterStatsRequest, new NodesResponseRestListener<>(channel));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -75,6 +75,11 @@ public class RestClearIndicesCacheAction extends BaseRestHandler {
|
|||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public static ClearIndicesCacheRequest fromRequest(final RestRequest request, ClearIndicesCacheRequest clearIndicesCacheRequest, ParseFieldMatcher parseFieldMatcher) {
|
||||
|
||||
for (Map.Entry<String, String> entry : request.params().entrySet()) {
|
||||
|
|
|
@ -117,4 +117,9 @@ public class RestIndicesStatsAction extends BaseRestHandler {
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -209,6 +209,11 @@ public class ClusterRerouteRequestTests extends ESTestCase {
|
|||
}
|
||||
builder.endObject();
|
||||
|
||||
return new FakeRestRequest(emptyMap(), params, hasBody ? builder.bytes() : null);
|
||||
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder();
|
||||
requestBuilder.withParams(params);
|
||||
if (hasBody) {
|
||||
requestBuilder.withContent(builder.bytes());
|
||||
}
|
||||
return requestBuilder.build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,10 +94,22 @@ public class NettyHttpClient implements Closeable {
|
|||
@SafeVarargs // Safe not because it doesn't do anything with the type parameters but because it won't leak them into other methods.
|
||||
public final Collection<HttpResponse> post(SocketAddress remoteAddress, Tuple<String, CharSequence>... urisAndBodies)
|
||||
throws InterruptedException {
|
||||
return processRequestsWithBody(HttpMethod.POST, remoteAddress, urisAndBodies);
|
||||
}
|
||||
|
||||
@SafeVarargs // Safe not because it doesn't do anything with the type parameters but because it won't leak them into other methods.
|
||||
public final Collection<HttpResponse> put(SocketAddress remoteAddress, Tuple<String, CharSequence>... urisAndBodies)
|
||||
throws InterruptedException {
|
||||
return processRequestsWithBody(HttpMethod.PUT, remoteAddress, urisAndBodies);
|
||||
}
|
||||
|
||||
@SafeVarargs // Safe not because it doesn't do anything with the type parameters but because it won't leak them into other methods.
|
||||
private final Collection<HttpResponse> processRequestsWithBody(HttpMethod method, SocketAddress remoteAddress, Tuple<String,
|
||||
CharSequence>... urisAndBodies) throws InterruptedException {
|
||||
Collection<HttpRequest> requests = new ArrayList<>(urisAndBodies.length);
|
||||
for (Tuple<String, CharSequence> uriAndBody : urisAndBodies) {
|
||||
ChannelBuffer content = ChannelBuffers.copiedBuffer(uriAndBody.v2(), StandardCharsets.UTF_8);
|
||||
HttpRequest request = new DefaultHttpRequest(HTTP_1_1, HttpMethod.POST, uriAndBody.v1());
|
||||
HttpRequest request = new DefaultHttpRequest(HTTP_1_1, method, uriAndBody.v1());
|
||||
request.headers().add(HOST, "localhost");
|
||||
request.headers().add(HttpHeaders.Names.CONTENT_LENGTH, content.readableBytes());
|
||||
request.setContent(content);
|
||||
|
|
|
@ -29,12 +29,12 @@ import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
|
|||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.test.junit.annotations.TestLogging;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
|
||||
|
@ -89,9 +89,35 @@ public class NettyHttpRequestSizeLimitIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
private void assertAtLeastOnceExpectedStatus(Collection<HttpResponse> responses, HttpResponseStatus expectedStatus) {
|
||||
long countResponseErrors = responses.stream().filter(r -> r.getStatus().equals(expectedStatus)).count();
|
||||
assertThat(countResponseErrors, greaterThan(0L));
|
||||
public void testDoesNotLimitExcludedRequests() throws Exception {
|
||||
ensureGreen();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Tuple<String, CharSequence>[] requestUris = new Tuple[1500];
|
||||
for (int i = 0; i < requestUris.length; i++) {
|
||||
requestUris[i] = Tuple.tuple("/_cluster/settings",
|
||||
"{ \"transient\": {\"indices.ttl.interval\": \"40s\" } }");
|
||||
}
|
||||
|
||||
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
|
||||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress
|
||||
().boundAddresses());
|
||||
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.put(inetSocketTransportAddress.address(), requestUris);
|
||||
assertThat(responses, hasSize(requestUris.length));
|
||||
assertAllInExpectedStatus(responses, HttpResponseStatus.OK);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertAtLeastOnceExpectedStatus(Collection<HttpResponse> responses, HttpResponseStatus expectedStatus) {
|
||||
long countExpectedStatus = responses.stream().filter(r -> r.getStatus().equals(expectedStatus)).count();
|
||||
assertThat("Expected at least one request with status [" + expectedStatus + "]", countExpectedStatus, greaterThan(0L));
|
||||
}
|
||||
|
||||
private void assertAllInExpectedStatus(Collection<HttpResponse> responses, HttpResponseStatus expectedStatus) {
|
||||
long countUnexpectedStatus = responses.stream().filter(r -> r.getStatus().equals(expectedStatus) == false).count();
|
||||
assertThat("Expected all requests with status [" + expectedStatus + "] but [" + countUnexpectedStatus +
|
||||
"] requests had a different one", countUnexpectedStatus, equalTo(0L));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -91,9 +91,39 @@ public class RestControllerTests extends ESTestCase {
|
|||
restHeaders.put("header.1", "true");
|
||||
restHeaders.put("header.2", "true");
|
||||
restHeaders.put("header.3", "false");
|
||||
restController.dispatchRequest(new FakeRestRequest(restHeaders), null, threadContext);
|
||||
restController.dispatchRequest(new FakeRestRequest.Builder().withHeaders(restHeaders).build(), null, threadContext);
|
||||
assertNull(threadContext.getHeader("header.1"));
|
||||
assertNull(threadContext.getHeader("header.2"));
|
||||
assertEquals("true", threadContext.getHeader("header.3"));
|
||||
}
|
||||
|
||||
public void testCanTripCircuitBreaker() throws Exception {
|
||||
RestController controller = new RestController(Settings.EMPTY);
|
||||
// trip circuit breaker by default
|
||||
controller.registerHandler(RestRequest.Method.GET, "/trip", new FakeRestHandler(true));
|
||||
controller.registerHandler(RestRequest.Method.GET, "/do-not-trip", new FakeRestHandler(false));
|
||||
|
||||
assertTrue(controller.canTripCircuitBreaker(new FakeRestRequest.Builder().withPath("/trip").build()));
|
||||
// assume trip even on unknown paths
|
||||
assertTrue(controller.canTripCircuitBreaker(new FakeRestRequest.Builder().withPath("/unknown-path").build()));
|
||||
assertFalse(controller.canTripCircuitBreaker(new FakeRestRequest.Builder().withPath("/do-not-trip").build()));
|
||||
}
|
||||
|
||||
private static class FakeRestHandler implements RestHandler {
|
||||
private final boolean canTripCircuitBreaker;
|
||||
|
||||
private FakeRestHandler(boolean canTripCircuitBreaker) {
|
||||
this.canTripCircuitBreaker = canTripCircuitBreaker;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleRequest(RestRequest request, RestChannel channel) throws Exception {
|
||||
//no op
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canTripCircuitBreaker() {
|
||||
return canTripCircuitBreaker;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.elasticsearch.rest.RestStatus;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
|
@ -82,7 +81,7 @@ public class RestMainActionTests extends ESTestCase {
|
|||
if (prettyPrint == false) {
|
||||
params.put("pretty", String.valueOf(prettyPrint));
|
||||
}
|
||||
RestRequest restRequest = new FakeRestRequest(Collections.emptyMap(), params);
|
||||
RestRequest restRequest = new FakeRestRequest.Builder().withParams(params).build();
|
||||
|
||||
BytesRestResponse response = RestMainAction.convertMainResponse(mainResponse, restRequest, builder);
|
||||
assertNotNull(response);
|
||||
|
|
|
@ -146,7 +146,7 @@ public class RestTableTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private RestResponse assertResponseContentType(Map<String, String> headers, String mediaType) throws Exception {
|
||||
FakeRestRequest requestWithAcceptHeader = new FakeRestRequest(headers);
|
||||
FakeRestRequest requestWithAcceptHeader = new FakeRestRequest.Builder().withHeaders(headers).build();
|
||||
table.startRow();
|
||||
table.addCell("foo");
|
||||
table.addCell("foo");
|
||||
|
|
|
@ -33,37 +33,35 @@ public class FakeRestRequest extends RestRequest {
|
|||
|
||||
private final BytesReference content;
|
||||
|
||||
private final Method method;
|
||||
|
||||
private final String path;
|
||||
|
||||
public FakeRestRequest() {
|
||||
this(new HashMap<>());
|
||||
this(new HashMap<>(), new HashMap<>(), null, Method.GET, "/");
|
||||
}
|
||||
|
||||
public FakeRestRequest(Map<String, String> headers) {
|
||||
this(headers, new HashMap<>());
|
||||
}
|
||||
|
||||
public FakeRestRequest(Map<String, String> headers, Map<String, String> params) {
|
||||
this(headers, params, null);
|
||||
}
|
||||
|
||||
public FakeRestRequest(Map<String, String> headers, Map<String, String> params, BytesReference content) {
|
||||
private FakeRestRequest(Map<String, String> headers, Map<String, String> params, BytesReference content, Method method, String path) {
|
||||
this.headers = headers;
|
||||
this.params = params;
|
||||
this.content = content;
|
||||
this.method = method;
|
||||
this.path = path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Method method() {
|
||||
return Method.GET;
|
||||
return method;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String uri() {
|
||||
return "/";
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String rawPath() {
|
||||
return "/";
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -109,4 +107,46 @@ public class FakeRestRequest extends RestRequest {
|
|||
public Map<String, String> params() {
|
||||
return params;
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private Map<String, String> headers = new HashMap<>();
|
||||
|
||||
private Map<String, String> params = new HashMap<>();
|
||||
|
||||
private BytesReference content;
|
||||
|
||||
private String path = "/";
|
||||
|
||||
private Method method = Method.GET;
|
||||
|
||||
public Builder withHeaders(Map<String, String> headers) {
|
||||
this.headers = headers;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withParams(Map<String, String> params) {
|
||||
this.params = params;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withContent(BytesReference content) {
|
||||
this.content = content;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withPath(String path) {
|
||||
this.path = path;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withMethod(Method method) {
|
||||
this.method = method;
|
||||
return this;
|
||||
}
|
||||
|
||||
public FakeRestRequest build() {
|
||||
return new FakeRestRequest(headers, params, content, method, path);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue