Exclude specific transport actions from request size limit check

We add support to explicitly exclude specific transport actions
from the request size limit check.

We also exclude the following request types currently:

*MasterPingRequest
* PingRequest
This commit is contained in:
Daniel Mitterdorfer 2016-05-13 14:21:24 +02:00
parent d3efe37814
commit ddbfda2c68
16 changed files with 188 additions and 37 deletions

View File

@ -54,7 +54,8 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
public TransportClusterHealthAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ClusterName clusterName, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, GatewayAllocator gatewayAllocator) {
super(settings, ClusterHealthAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterHealthRequest::new);
super(settings, ClusterHealthAction.NAME, false, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, ClusterHealthRequest::new);
this.clusterName = clusterName;
this.gatewayAllocator = gatewayAllocator;
}

View File

@ -47,7 +47,7 @@ public class TransportClusterStateAction extends TransportMasterNodeReadAction<C
@Inject
public TransportClusterStateAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ClusterName clusterName, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, ClusterStateAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterStateRequest::new);
super(settings, ClusterStateAction.NAME, false, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, ClusterStateRequest::new);
this.clusterName = clusterName;
}

View File

@ -36,9 +36,18 @@ import java.util.function.Supplier;
*/
public abstract class HandledTransportAction<Request extends ActionRequest<Request>, Response extends ActionResponse>
extends TransportAction<Request, Response> {
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
protected HandledTransportAction(Settings settings, String actionName, ThreadPool threadPool, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request) {
this(settings, actionName, true, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
}
protected HandledTransportAction(Settings settings, String actionName, boolean canTripCircuitBreaker, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, threadPool, actionFilters, indexNameExpressionResolver, transportService.getTaskManager());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new TransportHandler());
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, false, canTripCircuitBreaker,
new TransportHandler());
}
class TransportHandler implements TransportRequestHandler<Request> {

View File

@ -58,7 +58,15 @@ public abstract class TransportMasterNodeAction<Request extends MasterNodeReques
protected TransportMasterNodeAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, threadPool, transportService, actionFilters, indexNameExpressionResolver, request);
this(settings, actionName, true, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, request);
}
protected TransportMasterNodeAction(Settings settings, String actionName, boolean canTripCircuitBreaker,
TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<Request> request) {
super(settings, actionName, canTripCircuitBreaker, threadPool, transportService, actionFilters, indexNameExpressionResolver,
request);
this.transportService = transportService;
this.clusterService = clusterService;
this.executor = executor();

View File

@ -46,7 +46,14 @@ public abstract class TransportMasterNodeReadAction<Request extends MasterNodeRe
protected TransportMasterNodeReadAction(Settings settings, String actionName, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,request);
this(settings, actionName, true, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,request);
}
protected TransportMasterNodeReadAction(Settings settings, String actionName, boolean checkSizeLimit, TransportService transportService,
ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<Request> request) {
super(settings, actionName, checkSizeLimit, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver,request);
this.forceLocal = FORCE_LOCAL_SETTING.get(settings);
}

View File

@ -80,7 +80,8 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
this.transportNodeAction = actionName + "[n]";
transportService.registerRequestHandler(transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler());
transportService.registerRequestHandler(
transportNodeAction, nodeRequest, nodeExecutor, new NodeTransportHandler());
}
@Override

View File

@ -92,8 +92,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
final private WriteConsistencyLevel defaultWriteConsistencyLevel;
final private TransportRequestOptions transportOptions;
final private String transportReplicaAction;
final private String transportPrimaryAction;
// package private for testing
final String transportReplicaAction;
final String transportPrimaryAction;
final private ReplicasProxy replicasProxy;
protected TransportReplicationAction(Settings settings, String actionName, TransportService transportService,
@ -113,7 +114,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, request, executor, new PrimaryOperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true,
transportService.registerRequestHandler(transportReplicaAction, replicaRequest, executor, true, true,
new ReplicaOperationTransportHandler());
this.transportOptions = transportOptions();

View File

@ -201,7 +201,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
this.joinThreadControl = new JoinThreadControl(threadPool);
transportService.registerRequestHandler(DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
transportService.registerRequestHandler(
DISCOVERY_REJOIN_ACTION_NAME, RejoinClusterRequest::new, ThreadPool.Names.SAME, new RejoinClusterRequestHandler());
}
@Override

View File

@ -81,7 +81,8 @@ public class MasterFaultDetection extends FaultDetection {
logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
transportService.registerRequestHandler(MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, new MasterPingRequestHandler());
transportService.registerRequestHandler(
MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler());
}
public DiscoveryNode masterNode() {

View File

@ -72,7 +72,8 @@ public class NodesFaultDetection extends FaultDetection {
logger.debug("[node ] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout, pingRetryCount);
transportService.registerRequestHandler(PING_ACTION_NAME, PingRequest::new, ThreadPool.Names.SAME, new PingRequestHandler());
transportService.registerRequestHandler(
PING_ACTION_NAME, PingRequest::new, ThreadPool.Names.SAME, false, false, new PingRequestHandler());
}
public void setLocalNode(DiscoveryNode localNode) {

View File

@ -33,17 +33,20 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
private final String action;
private final TransportRequestHandler<Request> handler;
private final boolean forceExecution;
private final boolean canTripCircuitBreaker;
private final String executor;
private final Supplier<Request> requestFactory;
private final TaskManager taskManager;
public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TaskManager taskManager,
TransportRequestHandler<Request> handler, String executor, boolean forceExecution) {
TransportRequestHandler<Request> handler, String executor, boolean forceExecution,
boolean canTripCircuitBreaker) {
this.action = action;
this.requestFactory = requestFactory;
assert newRequest() != null;
this.handler = handler;
this.forceExecution = forceExecution;
this.canTripCircuitBreaker = canTripCircuitBreaker;
this.executor = executor;
this.taskManager = taskManager;
}
@ -77,6 +80,10 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
return forceExecution;
}
public boolean canTripCircuitBreaker() {
return canTripCircuitBreaker;
}
public String getExecutor() {
return executor;
}

View File

@ -580,23 +580,27 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String executor,
TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestFactory, taskManager, handler, executor, false, true);
registerRequestHandler(reg);
}
/**
* Registers a new request handler
*
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param handler The handler itself that implements the request handling
* @param action The action the request handler is associated with
* @param request The request class that will be used to constrcut new instances for streaming
* @param executor The executor the request handling will be executed on
* @param forceExecution Force execution on the executor queue and never reject it
* @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached.
* @param handler The handler itself that implements the request handling
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request,
String executor, boolean forceExecution,
boolean canTripCircuitBreaker,
TransportRequestHandler<Request> handler) {
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);
}

View File

@ -297,12 +297,16 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
Version version) throws Exception {
stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry);
final String action = stream.readString();
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
transportServiceAdapter.onRequestReceived(requestId, action);
inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
if (reg != null && reg.canTripCircuitBreaker()) {
inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
} else {
inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
}
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action,
requestId, version, messageLengthBytes);
try {
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException("Action [" + action + "] not found");
}

View File

@ -226,14 +226,17 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
transportServiceAdapter.onRequestReceived(requestId, action);
NettyTransportChannel transportChannel = null;
try {
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
requestId, version, profileName, messageLengthBytes);
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
if (reg == null) {
throw new ActionNotFoundTransportException(action);
}
if (reg.canTripCircuitBreaker()) {
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
} else {
transport.inFlightRequestsBreaker().addWithoutBreaking(messageLengthBytes);
}
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
requestId, version, profileName, messageLengthBytes);
final TransportRequest request = reg.newRequest();
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
request.readFrom(buffer);

View File

@ -20,40 +20,53 @@
package org.elasticsearch.discovery;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.discovery.zen.fd.FaultDetection;
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportConnectionListener;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.local.LocalTransport;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;
import static org.elasticsearch.cluster.service.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.cluster.service.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class ZenFaultDetectionTests extends ESTestCase {
protected ThreadPool threadPool;
protected ClusterService clusterService;
protected ClusterService clusterServiceA;
protected ClusterService clusterServiceB;
private CircuitBreakerService circuitBreakerService;
protected static final Version version0 = Version.fromId(/*0*/99);
@ -68,9 +81,14 @@ public class ZenFaultDetectionTests extends ESTestCase {
@Before
public void setUp() throws Exception {
super.setUp();
Settings settings = Settings.builder()
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), new ByteSizeValue(0))
.build();
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
threadPool = new ThreadPool(getClass().getName());
clusterService = createClusterService(threadPool);
circuitBreakerService = new NoneCircuitBreakerService();
clusterServiceA = createClusterService(threadPool);
clusterServiceB = createClusterService(threadPool);
circuitBreakerService = new HierarchyCircuitBreakerService(settings, clusterSettings);
serviceA = build(Settings.builder().put("name", "TS_A").build(), version0);
nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
serviceB = build(Settings.builder().put("name", "TS_B").build(), version1);
@ -109,7 +127,8 @@ public class ZenFaultDetectionTests extends ESTestCase {
super.tearDown();
serviceA.close();
serviceB.close();
clusterService.close();
clusterServiceA.close();
clusterServiceB.close();
terminate(threadPool);
}
@ -117,7 +136,10 @@ public class ZenFaultDetectionTests extends ESTestCase {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
MockTransportService transportService =
new MockTransportService(
Settings.EMPTY,
Settings.builder()
// trace zenfd actions but keep the default otherwise
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), singleton(TransportLivenessAction.NAME))
.build(),
new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService),
threadPool,
ClusterName.DEFAULT);
@ -183,6 +205,9 @@ public class ZenFaultDetectionTests extends ESTestCase {
serviceB.stop();
notified.await(30, TimeUnit.SECONDS);
CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
assertThat(inFlightRequestsBreaker.getTrippedCount(), equalTo(0L));
assertEquals(nodeB, failureNode[0]);
Matcher<String> matcher = Matchers.containsString("verified");
if (!shouldRetry) {
@ -200,9 +225,9 @@ public class ZenFaultDetectionTests extends ESTestCase {
.put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "5m");
ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20));
final ClusterState state = ClusterState.builder(clusterName).nodes(buildNodesForA(false)).build();
setState(clusterService, state);
setState(clusterServiceA, state);
MasterFaultDetection masterFD = new MasterFaultDetection(settings.build(), threadPool, serviceA, clusterName,
clusterService);
clusterServiceA);
masterFD.start(nodeB, "test");
final String[] failureReason = new String[1];
@ -217,6 +242,9 @@ public class ZenFaultDetectionTests extends ESTestCase {
serviceB.stop();
notified.await(30, TimeUnit.SECONDS);
CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
assertThat(inFlightRequestsBreaker.getTrippedCount(), equalTo(0L));
assertEquals(nodeB, failureNode[0]);
Matcher<String> matcher = Matchers.containsString("verified");
if (!shouldRetry) {
@ -225,4 +253,79 @@ public class ZenFaultDetectionTests extends ESTestCase {
assertThat(failureReason[0], matcher);
}
public void testMasterFaultDetectionNotSizeLimited() throws InterruptedException {
Settings.Builder settings = Settings.builder();
boolean shouldRetry = randomBoolean();
settings
.put(FaultDetection.CONNECT_ON_NETWORK_DISCONNECT_SETTING.getKey(), shouldRetry)
.put(FaultDetection.PING_INTERVAL_SETTING.getKey(), "1s");
ClusterName clusterName = new ClusterName(randomAsciiOfLengthBetween(3, 20));
final ClusterState stateNodeA = ClusterState.builder(clusterName).nodes(buildNodesForA(false)).build();
setState(clusterServiceA, stateNodeA);
int minExpectedPings = 2;
PingProbe pingProbeA = new PingProbe(minExpectedPings);
PingProbe pingProbeB = new PingProbe(minExpectedPings);
serviceA.addTracer(pingProbeA);
serviceB.addTracer(pingProbeB);
MasterFaultDetection masterFDNodeA = new MasterFaultDetection(settings.build(), threadPool, serviceA, clusterName,
clusterServiceA);
masterFDNodeA.start(nodeB, "test");
final ClusterState stateNodeB = ClusterState.builder(clusterName).nodes(buildNodesForB(true)).build();
setState(clusterServiceB, stateNodeB);
MasterFaultDetection masterFDNodeB = new MasterFaultDetection(settings.build(), threadPool, serviceB, clusterName,
clusterServiceB);
masterFDNodeB.start(nodeB, "test");
// let's do a few pings
pingProbeA.awaitMinCompletedPings();
pingProbeB.awaitMinCompletedPings();
CircuitBreaker inFlightRequestsBreaker = circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
assertThat(inFlightRequestsBreaker.getTrippedCount(), equalTo(0L));
assertThat(pingProbeA.completedPings(), greaterThanOrEqualTo(minExpectedPings));
assertThat(pingProbeB.completedPings(), greaterThanOrEqualTo(minExpectedPings));
}
private static class PingProbe extends MockTransportService.Tracer {
private final Set<Tuple<DiscoveryNode, Long>> inflightPings = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<Tuple<DiscoveryNode, Long>> completedPings = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final CountDownLatch waitForPings;
public PingProbe(int minCompletedPings) {
this.waitForPings = new CountDownLatch(minCompletedPings);
}
@Override
public void requestSent(DiscoveryNode node, long requestId, String action, TransportRequestOptions options) {
if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) {
inflightPings.add(Tuple.tuple(node, requestId));
}
}
@Override
public void receivedResponse(long requestId, DiscoveryNode sourceNode, String action) {
if (MasterFaultDetection.MASTER_PING_ACTION_NAME.equals(action)) {
Tuple<DiscoveryNode, Long> ping = Tuple.tuple(sourceNode, requestId);
if (inflightPings.remove(ping)) {
completedPings.add(ping);
waitForPings.countDown();
}
}
}
public int completedPings() {
return completedPings.size();
}
public void awaitMinCompletedPings() throws InterruptedException {
waitForPings.await();
}
}
}

View File

@ -743,8 +743,8 @@ public class IndicesRequestTests extends ESIntegTestCase {
}
@Override
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor, boolean forceExecution, TransportRequestHandler<Request> handler) {
super.registerRequestHandler(action, request, executor, forceExecution, new InterceptingRequestHandler<>(action, handler));
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor, boolean forceExecution, boolean canTripCircuitBreaker, TransportRequestHandler<Request> handler) {
super.registerRequestHandler(action, request, executor, forceExecution, canTripCircuitBreaker, new InterceptingRequestHandler<>(action, handler));
}
@Override