mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 09:28:27 +00:00
Limit request size on transport level
With this commit we limit the size of all in-flight requests on transport level. The size is guarded by a circuit breaker and is based on the content size of each request. By default we use 100% of available heap meaning that the parent circuit breaker will limit the maximum available size. This value can be changed by adjusting the setting network.breaker.inflight_requests.limit Relates #16011
This commit is contained in:
parent
a581d7cca4
commit
52b2016447
@ -28,11 +28,12 @@ import java.util.Locale;
|
||||
*/
|
||||
public interface CircuitBreaker {
|
||||
|
||||
public static final String PARENT = "parent";
|
||||
public static final String FIELDDATA = "fielddata";
|
||||
public static final String REQUEST = "request";
|
||||
String PARENT = "parent";
|
||||
String FIELDDATA = "fielddata";
|
||||
String REQUEST = "request";
|
||||
String IN_FLIGHT_REQUESTS = "in_flight_requests";
|
||||
|
||||
public static enum Type {
|
||||
enum Type {
|
||||
// A regular or child MemoryCircuitBreaker
|
||||
MEMORY,
|
||||
// A special parent-type for the hierarchy breaker service
|
||||
@ -59,7 +60,7 @@ public interface CircuitBreaker {
|
||||
* @param fieldName name of the field responsible for tripping the breaker
|
||||
* @param bytesNeeded bytes asked for but unable to be allocated
|
||||
*/
|
||||
public void circuitBreak(String fieldName, long bytesNeeded);
|
||||
void circuitBreak(String fieldName, long bytesNeeded);
|
||||
|
||||
/**
|
||||
* add bytes to the breaker and maybe trip
|
||||
@ -67,35 +68,35 @@ public interface CircuitBreaker {
|
||||
* @param label string label describing the bytes being added
|
||||
* @return the number of "used" bytes for the circuit breaker
|
||||
*/
|
||||
public double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;
|
||||
double addEstimateBytesAndMaybeBreak(long bytes, String label) throws CircuitBreakingException;
|
||||
|
||||
/**
|
||||
* Adjust the circuit breaker without tripping
|
||||
*/
|
||||
public long addWithoutBreaking(long bytes);
|
||||
long addWithoutBreaking(long bytes);
|
||||
|
||||
/**
|
||||
* @return the currently used bytes the breaker is tracking
|
||||
*/
|
||||
public long getUsed();
|
||||
long getUsed();
|
||||
|
||||
/**
|
||||
* @return maximum number of bytes the circuit breaker can track before tripping
|
||||
*/
|
||||
public long getLimit();
|
||||
long getLimit();
|
||||
|
||||
/**
|
||||
* @return overhead of circuit breaker
|
||||
*/
|
||||
public double getOverhead();
|
||||
double getOverhead();
|
||||
|
||||
/**
|
||||
* @return the number of times the circuit breaker has been tripped
|
||||
*/
|
||||
public long getTrippedCount();
|
||||
long getTrippedCount();
|
||||
|
||||
/**
|
||||
* @return the name of the breaker
|
||||
*/
|
||||
public String getName();
|
||||
String getName();
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ public class MemoryCircuitBreaker implements CircuitBreaker {
|
||||
final String message = "Data too large, data for field [" + fieldName + "] would be larger than limit of [" +
|
||||
memoryBytesLimit + "/" + new ByteSizeValue(memoryBytesLimit) + "]";
|
||||
logger.debug("{}", message);
|
||||
throw new CircuitBreakingException(message);
|
||||
throw new CircuitBreakingException(message, bytesNeeded, memoryBytesLimit);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -24,6 +24,7 @@ package org.elasticsearch.common.breaker;
|
||||
* basically noops
|
||||
*/
|
||||
public class NoopCircuitBreaker implements CircuitBreaker {
|
||||
public static final int LIMIT = -1;
|
||||
|
||||
private final String name;
|
||||
|
||||
@ -53,7 +54,7 @@ public class NoopCircuitBreaker implements CircuitBreaker {
|
||||
|
||||
@Override
|
||||
public long getLimit() {
|
||||
return 0;
|
||||
return LIMIT;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -254,6 +254,8 @@ public final class ClusterSettings extends AbstractScopedSettings {
|
||||
HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING,
|
||||
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING,
|
||||
ClusterService.CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
|
||||
|
@ -45,7 +45,7 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
|
||||
private static final String CHILD_LOGGER_PREFIX = "org.elasticsearch.indices.breaker.";
|
||||
|
||||
private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap();
|
||||
private final ConcurrentMap<String, CircuitBreaker> breakers = new ConcurrentHashMap<>();
|
||||
|
||||
public static final Setting<ByteSizeValue> TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING =
|
||||
Setting.byteSizeSetting("indices.breaker.total.limit", "70%", Property.Dynamic, Property.NodeScope);
|
||||
@ -64,10 +64,16 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
public static final Setting<CircuitBreaker.Type> REQUEST_CIRCUIT_BREAKER_TYPE_SETTING =
|
||||
new Setting<>("indices.breaker.request.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);
|
||||
|
||||
|
||||
public static final Setting<ByteSizeValue> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING =
|
||||
Setting.byteSizeSetting("network.breaker.inflight_requests.limit", "100%", Property.Dynamic, Property.NodeScope);
|
||||
public static final Setting<Double> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING =
|
||||
Setting.doubleSetting("network.breaker.inflight_requests.overhead", 1.0d, 0.0d, Property.Dynamic, Property.NodeScope);
|
||||
public static final Setting<CircuitBreaker.Type> IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING =
|
||||
new Setting<>("network.breaker.inflight_requests.type", "memory", CircuitBreaker.Type::parseValue, Property.NodeScope);
|
||||
|
||||
private volatile BreakerSettings parentSettings;
|
||||
private volatile BreakerSettings fielddataSettings;
|
||||
private volatile BreakerSettings inFlightRequestsSettings;
|
||||
private volatile BreakerSettings requestSettings;
|
||||
|
||||
// Tripped count for when redistribution was attempted but wasn't successful
|
||||
@ -82,6 +88,12 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
FIELDDATA_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
|
||||
);
|
||||
|
||||
this.inFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS,
|
||||
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(),
|
||||
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
|
||||
IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_TYPE_SETTING.get(settings)
|
||||
);
|
||||
|
||||
this.requestSettings = new BreakerSettings(CircuitBreaker.REQUEST,
|
||||
REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.get(settings).bytes(),
|
||||
REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.get(settings),
|
||||
@ -95,11 +107,14 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
|
||||
registerBreaker(this.requestSettings);
|
||||
registerBreaker(this.fielddataSettings);
|
||||
registerBreaker(this.inFlightRequestsSettings);
|
||||
|
||||
clusterSettings.addSettingsUpdateConsumer(TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING, this::setTotalCircuitBreakerLimit, this::validateTotalCircuitBreakerLimit);
|
||||
clusterSettings.addSettingsUpdateConsumer(FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING, FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setFieldDataBreakerLimit);
|
||||
clusterSettings.addSettingsUpdateConsumer(IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING, IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setInFlightRequestsBreakerLimit);
|
||||
clusterSettings.addSettingsUpdateConsumer(REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING, REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING, this::setRequestBreakerLimit);
|
||||
}
|
||||
|
||||
private void setRequestBreakerLimit(ByteSizeValue newRequestMax, Double newRequestOverhead) {
|
||||
BreakerSettings newRequestSettings = new BreakerSettings(CircuitBreaker.REQUEST, newRequestMax.bytes(), newRequestOverhead,
|
||||
HierarchyCircuitBreakerService.this.requestSettings.getType());
|
||||
@ -108,6 +123,14 @@ public class HierarchyCircuitBreakerService extends CircuitBreakerService {
|
||||
logger.info("Updated breaker settings request: {}", newRequestSettings);
|
||||
}
|
||||
|
||||
private void setInFlightRequestsBreakerLimit(ByteSizeValue newInFlightRequestsMax, Double newInFlightRequestsOverhead) {
|
||||
BreakerSettings newInFlightRequestsSettings = new BreakerSettings(CircuitBreaker.IN_FLIGHT_REQUESTS, newInFlightRequestsMax.bytes(),
|
||||
newInFlightRequestsOverhead, HierarchyCircuitBreakerService.this.inFlightRequestsSettings.getType());
|
||||
registerBreaker(newInFlightRequestsSettings);
|
||||
HierarchyCircuitBreakerService.this.inFlightRequestsSettings = newInFlightRequestsSettings;
|
||||
logger.info("Updated breaker settings for in-flight requests: {}", newInFlightRequestsSettings);
|
||||
}
|
||||
|
||||
private void setFieldDataBreakerLimit(ByteSizeValue newFielddataMax, Double newFielddataOverhead) {
|
||||
long newFielddataLimitBytes = newFielddataMax == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getLimit() : newFielddataMax.bytes();
|
||||
newFielddataOverhead = newFielddataOverhead == null ? HierarchyCircuitBreakerService.this.fielddataSettings.getOverhead() : newFielddataOverhead;
|
||||
|
@ -85,7 +85,8 @@ public interface Transport extends LifecycleComponent<Transport> {
|
||||
/**
|
||||
* Sends the request to the node.
|
||||
*/
|
||||
void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException;
|
||||
void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws
|
||||
IOException, TransportException;
|
||||
|
||||
/**
|
||||
* Returns count of currently open connections
|
||||
|
@ -23,6 +23,7 @@ import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
@ -37,6 +38,7 @@ import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.ConnectTransportException;
|
||||
@ -83,13 +85,15 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
private static final AtomicLong transportAddressIdGenerator = new AtomicLong();
|
||||
private final ConcurrentMap<DiscoveryNode, LocalTransport> connectedNodes = newConcurrentMap();
|
||||
protected final NamedWriteableRegistry namedWriteableRegistry;
|
||||
private final CircuitBreakerService circuitBreakerService;
|
||||
|
||||
public static final String TRANSPORT_LOCAL_ADDRESS = "transport.local.address";
|
||||
public static final String TRANSPORT_LOCAL_WORKERS = "transport.local.workers";
|
||||
public static final String TRANSPORT_LOCAL_QUEUE = "transport.local.queue";
|
||||
|
||||
@Inject
|
||||
public LocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
public LocalTransport(Settings settings, ThreadPool threadPool, Version version,
|
||||
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.version = version;
|
||||
@ -100,6 +104,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
this.workers = EsExecutors.newFixed(LOCAL_TRANSPORT_THREAD_NAME_PREFIX, workerCount, queueSize, threadFactory,
|
||||
threadPool.getThreadContext());
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -239,6 +244,11 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
return this.workers;
|
||||
}
|
||||
|
||||
CircuitBreaker inFlightRequestsBreaker() {
|
||||
// We always obtain a fresh breaker to reflect changes to the breaker configuration.
|
||||
return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
|
||||
}
|
||||
|
||||
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version,
|
||||
@Nullable final Long sendRequestId) {
|
||||
Transports.assertTransportThread();
|
||||
@ -253,13 +263,13 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
if (isRequest) {
|
||||
ThreadContext threadContext = threadPool.getThreadContext();
|
||||
threadContext.readHeaders(stream);
|
||||
handleRequest(stream, requestId, sourceTransport, version);
|
||||
handleRequest(stream, requestId, data.length, sourceTransport, version);
|
||||
} else {
|
||||
final TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
|
||||
// ignore if its null, the adapter logs it
|
||||
if (handler != null) {
|
||||
if (TransportStatus.isError(status)) {
|
||||
handlerResponseError(stream, handler);
|
||||
handleResponseError(stream, handler);
|
||||
} else {
|
||||
handleResponse(stream, sourceTransport, handler);
|
||||
}
|
||||
@ -267,9 +277,15 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
if (sendRequestId != null) {
|
||||
TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(sendRequestId);
|
||||
TransportResponseHandler handler = sourceTransport.transportServiceAdapter.onResponseReceived(sendRequestId);
|
||||
if (handler != null) {
|
||||
handleException(handler, new RemoteTransportException(nodeName(), localAddress, action, e));
|
||||
RemoteTransportException error = new RemoteTransportException(nodeName(), localAddress, action, e);
|
||||
sourceTransport.workers().execute(() -> {
|
||||
ThreadContext threadContext = sourceTransport.threadPool.getThreadContext();
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
sourceTransport.handleException(handler, error);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
logger.warn("Failed to receive message for action [{}]", e, action);
|
||||
@ -277,12 +293,14 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
}
|
||||
}
|
||||
|
||||
private void handleRequest(StreamInput stream, long requestId, LocalTransport sourceTransport, Version version) throws Exception {
|
||||
private void handleRequest(StreamInput stream, long requestId, int messageLengthBytes, LocalTransport sourceTransport,
|
||||
Version version) throws Exception {
|
||||
stream = new NamedWriteableAwareStreamInput(stream, namedWriteableRegistry);
|
||||
final String action = stream.readString();
|
||||
transportServiceAdapter.onRequestReceived(requestId, action);
|
||||
inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
|
||||
final LocalTransportChannel transportChannel = new LocalTransportChannel(this, transportServiceAdapter, sourceTransport, action,
|
||||
requestId, version);
|
||||
requestId, version, messageLengthBytes);
|
||||
try {
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
@ -356,7 +374,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
});
|
||||
}
|
||||
|
||||
private void handlerResponseError(StreamInput buffer, final TransportResponseHandler handler) {
|
||||
private void handleResponseError(StreamInput buffer, final TransportResponseHandler handler) {
|
||||
Throwable error;
|
||||
try {
|
||||
error = buffer.readThrowable();
|
||||
|
@ -30,6 +30,7 @@ import org.elasticsearch.transport.TransportServiceAdapter;
|
||||
import org.elasticsearch.transport.support.TransportStatus;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -45,15 +46,18 @@ public class LocalTransportChannel implements TransportChannel {
|
||||
private final String action;
|
||||
private final long requestId;
|
||||
private final Version version;
|
||||
private final long reservedBytes;
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
|
||||
public LocalTransportChannel(LocalTransport sourceTransport, TransportServiceAdapter sourceTransportServiceAdapter,
|
||||
LocalTransport targetTransport, String action, long requestId, Version version) {
|
||||
LocalTransport targetTransport, String action, long requestId, Version version, long reservedBytes) {
|
||||
this.sourceTransport = sourceTransport;
|
||||
this.sourceTransportServiceAdapter = sourceTransportServiceAdapter;
|
||||
this.targetTransport = targetTransport;
|
||||
this.action = action;
|
||||
this.requestId = requestId;
|
||||
this.version = version;
|
||||
this.reservedBytes = reservedBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -80,13 +84,7 @@ public class LocalTransportChannel implements TransportChannel {
|
||||
status = TransportStatus.setResponse(status);
|
||||
stream.writeByte(status); // 0 for request, 1 for response.
|
||||
response.writeTo(stream);
|
||||
final byte[] data = stream.bytes().toBytes();
|
||||
targetTransport.workers().execute(() -> {
|
||||
ThreadContext threadContext = targetTransport.threadPool.getThreadContext();
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()){
|
||||
targetTransport.messageReceived(data, action, sourceTransport, version, null);
|
||||
}
|
||||
});
|
||||
sendResponseData(stream.bytes().toBytes());
|
||||
sourceTransportServiceAdapter.onResponseSent(requestId, action, response, options);
|
||||
}
|
||||
}
|
||||
@ -98,15 +96,26 @@ public class LocalTransportChannel implements TransportChannel {
|
||||
RemoteTransportException tx = new RemoteTransportException(targetTransport.nodeName(),
|
||||
targetTransport.boundAddress().boundAddresses()[0], action, error);
|
||||
stream.writeThrowable(tx);
|
||||
sendResponseData(stream.bytes().toBytes());
|
||||
sourceTransportServiceAdapter.onResponseSent(requestId, action, error);
|
||||
}
|
||||
|
||||
final byte[] data = stream.bytes().toBytes();
|
||||
private void sendResponseData(byte[] data) {
|
||||
close();
|
||||
targetTransport.workers().execute(() -> {
|
||||
ThreadContext threadContext = targetTransport.threadPool.getThreadContext();
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()){
|
||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||
targetTransport.messageReceived(data, action, sourceTransport, version, null);
|
||||
}
|
||||
});
|
||||
sourceTransportServiceAdapter.onResponseSent(requestId, action, error);
|
||||
}
|
||||
|
||||
private void close() {
|
||||
// attempt to close once atomically
|
||||
if (closed.compareAndSet(false, true) == false) {
|
||||
throw new IllegalStateException("Channel is already closed");
|
||||
}
|
||||
sourceTransport.inFlightRequestsBreaker().addWithoutBreaking(-reservedBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -91,17 +91,15 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
return;
|
||||
}
|
||||
ChannelBuffer buffer = (ChannelBuffer) m;
|
||||
int size = buffer.getInt(buffer.readerIndex() - 4);
|
||||
transportServiceAdapter.received(size + 6);
|
||||
Marker marker = new Marker(buffer);
|
||||
int size = marker.messageSizeWithRemainingHeaders();
|
||||
transportServiceAdapter.received(marker.messageSizeWithAllHeaders());
|
||||
|
||||
// we have additional bytes to read, outside of the header
|
||||
boolean hasMessageBytesToRead = (size - (NettyHeader.HEADER_SIZE - 6)) != 0;
|
||||
|
||||
int markedReaderIndex = buffer.readerIndex();
|
||||
int expectedIndexReader = markedReaderIndex + size;
|
||||
boolean hasMessageBytesToRead = marker.messageSize() != 0;
|
||||
|
||||
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
|
||||
// buffer, or in the cumlation buffer, which is cleaned each time
|
||||
// buffer, or in the cumulation buffer, which is cleaned each time
|
||||
StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
|
||||
boolean success = false;
|
||||
try (ThreadContext.StoredContext tCtx = threadContext.stashContext()) {
|
||||
@ -134,25 +132,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
streamIn.setVersion(version);
|
||||
if (TransportStatus.isRequest(status)) {
|
||||
threadContext.readHeaders(streamIn);
|
||||
String action = handleRequest(ctx.getChannel(), streamIn, requestId, version);
|
||||
|
||||
// Chek the entire message has been read
|
||||
final int nextByte = streamIn.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedIndexReader) {
|
||||
throw new IllegalStateException("Message is fully read (request), yet there are "
|
||||
+ (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedIndexReader) {
|
||||
throw new IllegalStateException(
|
||||
"Message read past expected size (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
|
||||
}
|
||||
|
||||
handleRequest(ctx.getChannel(), marker, streamIn, requestId, size, version);
|
||||
} else {
|
||||
TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
|
||||
// ignore if its null, the adapter logs it
|
||||
@ -162,25 +142,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
} else {
|
||||
handleResponse(ctx.getChannel(), streamIn, handler);
|
||||
}
|
||||
|
||||
// Chek the entire message has been read
|
||||
final int nextByte = streamIn.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
|
||||
+ handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedIndexReader) {
|
||||
throw new IllegalStateException("Message is fully read (response), yet there are "
|
||||
+ (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedIndexReader) {
|
||||
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId
|
||||
+ "], handler [" + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
|
||||
}
|
||||
|
||||
marker.validateResponse(streamIn, requestId, handler, TransportStatus.isError(status));
|
||||
}
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
try {
|
||||
if (success) {
|
||||
@ -190,7 +155,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
}
|
||||
} finally {
|
||||
// Set the expected position of the buffer, no matter what happened
|
||||
buffer.readerIndex(expectedIndexReader);
|
||||
buffer.readerIndex(marker.expectedReaderIndex());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -254,13 +219,17 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
}
|
||||
}
|
||||
|
||||
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
|
||||
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, int messageLengthBytes,
|
||||
Version version) throws IOException {
|
||||
buffer = new NamedWriteableAwareStreamInput(buffer, transport.namedWriteableRegistry);
|
||||
final String action = buffer.readString();
|
||||
transportServiceAdapter.onRequestReceived(requestId, action);
|
||||
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName);
|
||||
NettyTransportChannel transportChannel = null;
|
||||
try {
|
||||
transport.inFlightRequestsBreaker().addEstimateBytesAndMaybeBreak(messageLengthBytes, "<transport_request>");
|
||||
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName, messageLengthBytes);
|
||||
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
throw new ActionNotFoundTransportException(action);
|
||||
@ -268,6 +237,8 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
final TransportRequest request = reg.newRequest();
|
||||
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
request.readFrom(buffer);
|
||||
// in case we throw an exception, i.e. when the limit is hit, we don't want to verify
|
||||
validateRequest(marker, buffer, requestId, action);
|
||||
if (ThreadPool.Names.SAME.equals(reg.getExecutor())) {
|
||||
//noinspection unchecked
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
@ -275,6 +246,11 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
// the circuit breaker tripped
|
||||
if (transportChannel == null) {
|
||||
transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel,
|
||||
requestId, version, profileName, 0);
|
||||
}
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
@ -285,6 +261,12 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
return action;
|
||||
}
|
||||
|
||||
// This template method is needed to inject custom error checking logic in tests.
|
||||
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
|
||||
marker.validateRequest(buffer, requestId, action);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
|
||||
transport.exceptionCaught(ctx, e);
|
||||
@ -346,4 +328,106 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal helper class to store characteristic offsets of a buffer during processing
|
||||
*/
|
||||
protected static final class Marker {
|
||||
private final ChannelBuffer buffer;
|
||||
private final int remainingMessageSize;
|
||||
private final int expectedReaderIndex;
|
||||
|
||||
public Marker(ChannelBuffer buffer) {
|
||||
this.buffer = buffer;
|
||||
// when this constructor is called, we have read already two parts of the message header: the marker bytes and the message
|
||||
// message length (see SizeHeaderFrameDecoder). Hence we have to rewind the index for MESSAGE_LENGTH_SIZE bytes to read the
|
||||
// remaining message length again.
|
||||
this.remainingMessageSize = buffer.getInt(buffer.readerIndex() - NettyHeader.MESSAGE_LENGTH_SIZE);
|
||||
this.expectedReaderIndex = buffer.readerIndex() + remainingMessageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes that have yet to be read from the buffer
|
||||
*/
|
||||
public int messageSizeWithRemainingHeaders() {
|
||||
return remainingMessageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number in bytes for the message including all headers (even the ones that have been read from the buffer already)
|
||||
*/
|
||||
public int messageSizeWithAllHeaders() {
|
||||
return remainingMessageSize + NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the number of bytes for the message itself (excluding all headers).
|
||||
*/
|
||||
public int messageSize() {
|
||||
return messageSizeWithAllHeaders() - NettyHeader.HEADER_SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the expected index of the buffer's reader after the message has been consumed entirely.
|
||||
*/
|
||||
public int expectedReaderIndex() {
|
||||
return expectedReaderIndex;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a request has been fully read (not too few bytes but also not too many bytes).
|
||||
*
|
||||
* @param stream A stream that is associated with the buffer that is tracked by this marker.
|
||||
* @param requestId The current request id.
|
||||
* @param action The currently executed action.
|
||||
* @throws IOException Iff the stream could not be read.
|
||||
* @throws IllegalStateException Iff the request has not been fully read.
|
||||
*/
|
||||
public void validateRequest(StreamInput stream, long requestId, String action) throws IOException {
|
||||
final int nextByte = stream.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message is fully read (request), yet there are "
|
||||
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedReaderIndex) {
|
||||
throw new IllegalStateException(
|
||||
"Message read past expected size (request) for requestId [" + requestId + "], action [" + action
|
||||
+ "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedReaderIndex + "]; resetting");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a response has been fully read (not too few bytes but also not too many bytes).
|
||||
*
|
||||
* @param stream A stream that is associated with the buffer that is tracked by this marker.
|
||||
* @param requestId The corresponding request id for this response.
|
||||
* @param handler The current response handler.
|
||||
* @param error Whether validate an error response.
|
||||
* @throws IOException Iff the stream could not be read.
|
||||
* @throws IllegalStateException Iff the request has not been fully read.
|
||||
*/
|
||||
public void validateResponse(StreamInput stream, long requestId,
|
||||
TransportResponseHandler<?> handler, boolean error) throws IOException {
|
||||
// Check the entire message has been read
|
||||
final int nextByte = stream.read();
|
||||
// calling read() is useful to make sure the message is fully read, even if there is an EOS marker
|
||||
if (nextByte != -1) {
|
||||
throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
|
||||
+ handler + "], error [" + error + "]; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() < expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message is fully read (response), yet there are "
|
||||
+ (expectedReaderIndex - buffer.readerIndex()) + " remaining bytes; resetting");
|
||||
}
|
||||
if (buffer.readerIndex() > expectedReaderIndex) {
|
||||
throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId
|
||||
+ "], handler [" + handler + "], error [" + error + "]; resetting");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,8 +26,17 @@ import org.jboss.netty.buffer.ChannelBuffers;
|
||||
/**
|
||||
*/
|
||||
public class NettyHeader {
|
||||
public static final int MARKER_BYTES_SIZE = 2 * 1;
|
||||
|
||||
public static final int HEADER_SIZE = 2 + 4 + 8 + 1 + 4;
|
||||
public static final int MESSAGE_LENGTH_SIZE = 4;
|
||||
|
||||
public static final int REQUEST_ID_SIZE = 8;
|
||||
|
||||
public static final int STATUS_SIZE = 1;
|
||||
|
||||
public static final int VERSION_ID_SIZE = 4;
|
||||
|
||||
public static final int HEADER_SIZE = MARKER_BYTES_SIZE + MESSAGE_LENGTH_SIZE + REQUEST_ID_SIZE + STATUS_SIZE + VERSION_ID_SIZE;
|
||||
|
||||
/**
|
||||
* The magic number (must be lower than 0) for a ping message. This is handled
|
||||
@ -56,12 +65,12 @@ public class NettyHeader {
|
||||
buffer.setByte(index, 'S');
|
||||
index += 1;
|
||||
// write the size, the size indicates the remaining message size, not including the size int
|
||||
buffer.setInt(index, buffer.readableBytes() - 6);
|
||||
index += 4;
|
||||
buffer.setInt(index, buffer.readableBytes() - MARKER_BYTES_SIZE - MESSAGE_LENGTH_SIZE);
|
||||
index += MESSAGE_LENGTH_SIZE;
|
||||
buffer.setLong(index, requestId);
|
||||
index += 8;
|
||||
index += REQUEST_ID_SIZE;
|
||||
buffer.setByte(index, status);
|
||||
index += 1;
|
||||
index += STATUS_SIZE;
|
||||
buffer.setInt(index, version.id);
|
||||
}
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Booleans;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
@ -56,6 +57,7 @@ import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractLifecycleRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.BindTransportException;
|
||||
@ -242,6 +244,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
protected volatile BoundTransportAddress boundAddress;
|
||||
protected final KeyedLock<String> connectionLock = new KeyedLock<>();
|
||||
protected final NamedWriteableRegistry namedWriteableRegistry;
|
||||
private final CircuitBreakerService circuitBreakerService;
|
||||
|
||||
// this lock is here to make sure we close this transport and disconnect all the client nodes
|
||||
// connections while no connect operations is going on... (this might help with 100% CPU when stopping the transport?)
|
||||
@ -252,7 +255,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
|
||||
@Inject
|
||||
public NettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version,
|
||||
NamedWriteableRegistry namedWriteableRegistry) {
|
||||
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
this.networkService = networkService;
|
||||
@ -288,6 +291,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
threadPool.schedule(pingSchedule, ThreadPool.Names.GENERIC, scheduledPing);
|
||||
}
|
||||
this.namedWriteableRegistry = namedWriteableRegistry;
|
||||
this.circuitBreakerService = circuitBreakerService;
|
||||
}
|
||||
|
||||
public Settings settings() {
|
||||
@ -307,6 +311,11 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
||||
return threadPool;
|
||||
}
|
||||
|
||||
CircuitBreaker inFlightRequestsBreaker() {
|
||||
// We always obtain a fresh breaker to reflect changes to the breaker configuration.
|
||||
return circuitBreakerService.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
boolean success = false;
|
||||
|
@ -39,6 +39,7 @@ import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
*
|
||||
@ -52,9 +53,11 @@ public class NettyTransportChannel implements TransportChannel {
|
||||
private final Channel channel;
|
||||
private final long requestId;
|
||||
private final String profileName;
|
||||
private final long reservedBytes;
|
||||
private final AtomicBoolean closed = new AtomicBoolean();
|
||||
|
||||
public NettyTransportChannel(NettyTransport transport, TransportServiceAdapter transportServiceAdapter, String action, Channel channel,
|
||||
long requestId, Version version, String profileName) {
|
||||
long requestId, Version version, String profileName, long reservedBytes) {
|
||||
this.transportServiceAdapter = transportServiceAdapter;
|
||||
this.version = version;
|
||||
this.transport = transport;
|
||||
@ -62,6 +65,7 @@ public class NettyTransportChannel implements TransportChannel {
|
||||
this.channel = channel;
|
||||
this.requestId = requestId;
|
||||
this.profileName = profileName;
|
||||
this.reservedBytes = reservedBytes;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -81,6 +85,7 @@ public class NettyTransportChannel implements TransportChannel {
|
||||
|
||||
@Override
|
||||
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
close();
|
||||
if (transport.compress) {
|
||||
options = TransportResponseOptions.builder(options).withCompress(transport.compress).build();
|
||||
}
|
||||
@ -88,9 +93,10 @@ public class NettyTransportChannel implements TransportChannel {
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
|
||||
ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(transport.bigArrays);
|
||||
ReleasableBytesStreamOutput bStream = null;
|
||||
boolean addedReleaseListener = false;
|
||||
try {
|
||||
bStream = new ReleasableBytesStreamOutput(transport.bigArrays);
|
||||
bStream.skip(NettyHeader.HEADER_SIZE);
|
||||
StreamOutput stream = bStream;
|
||||
if (options.compress()) {
|
||||
@ -110,7 +116,7 @@ public class NettyTransportChannel implements TransportChannel {
|
||||
addedReleaseListener = true;
|
||||
transportServiceAdapter.onResponseSent(requestId, action, response, options);
|
||||
} finally {
|
||||
if (!addedReleaseListener) {
|
||||
if (!addedReleaseListener && bStream != null) {
|
||||
Releasables.close(bStream.bytes());
|
||||
}
|
||||
}
|
||||
@ -118,10 +124,11 @@ public class NettyTransportChannel implements TransportChannel {
|
||||
|
||||
@Override
|
||||
public void sendResponse(Throwable error) throws IOException {
|
||||
close();
|
||||
BytesStreamOutput stream = new BytesStreamOutput();
|
||||
stream.skip(NettyHeader.HEADER_SIZE);
|
||||
RemoteTransportException tx = new RemoteTransportException(transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()),
|
||||
action, error);
|
||||
RemoteTransportException tx = new RemoteTransportException(
|
||||
transport.nodeName(), transport.wrapAddress(channel.getLocalAddress()), action, error);
|
||||
stream.writeThrowable(tx);
|
||||
byte status = 0;
|
||||
status = TransportStatus.setResponse(status);
|
||||
@ -134,6 +141,14 @@ public class NettyTransportChannel implements TransportChannel {
|
||||
transportServiceAdapter.onResponseSent(requestId, action, error);
|
||||
}
|
||||
|
||||
private void close() {
|
||||
// attempt to close once atomically
|
||||
if (closed.compareAndSet(false, true) == false) {
|
||||
throw new IllegalStateException("Channel is already closed");
|
||||
}
|
||||
transport.inFlightRequestsBreaker().addWithoutBreaking(-reservedBytes);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getRequestId() {
|
||||
return requestId;
|
||||
|
@ -41,7 +41,8 @@ public class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
|
||||
@Override
|
||||
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
|
||||
if (buffer.readableBytes() < 6) {
|
||||
final int sizeHeaderLength = NettyHeader.MARKER_BYTES_SIZE + NettyHeader.MESSAGE_LENGTH_SIZE;
|
||||
if (buffer.readableBytes() < sizeHeaderLength) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -68,11 +69,11 @@ public class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
+ Integer.toHexString(buffer.getByte(readerIndex + 3) & 0xFF) + ")");
|
||||
}
|
||||
|
||||
int dataLen = buffer.getInt(buffer.readerIndex() + 2);
|
||||
int dataLen = buffer.getInt(buffer.readerIndex() + NettyHeader.MARKER_BYTES_SIZE);
|
||||
if (dataLen == NettyHeader.PING_DATA_SIZE) {
|
||||
// discard the messages we read and continue, this is achieved by skipping the bytes
|
||||
// and returning null
|
||||
buffer.skipBytes(6);
|
||||
buffer.skipBytes(sizeHeaderLength);
|
||||
return null;
|
||||
}
|
||||
if (dataLen <= 0) {
|
||||
@ -84,10 +85,10 @@ public class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
+ new ByteSizeValue(NINETY_PER_HEAP_SIZE) + "]");
|
||||
}
|
||||
|
||||
if (buffer.readableBytes() < dataLen + 6) {
|
||||
if (buffer.readableBytes() < dataLen + sizeHeaderLength) {
|
||||
return null;
|
||||
}
|
||||
buffer.skipBytes(6);
|
||||
buffer.skipBytes(sizeHeaderLength);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@ -121,4 +122,4 @@ public class SizeHeaderFrameDecoder extends FrameDecoder {
|
||||
super(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -38,6 +38,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
@ -184,8 +185,8 @@ public abstract class TaskManagerTestCase extends ESTestCase {
|
||||
public static class TestNode implements Releasable {
|
||||
public TestNode(String name, ThreadPool threadPool, Settings settings) {
|
||||
transportService = new TransportService(settings,
|
||||
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
|
||||
threadPool) {
|
||||
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry(),
|
||||
new NoneCircuitBreakerService()), threadPool) {
|
||||
@Override
|
||||
protected TaskManager createTaskManager() {
|
||||
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {
|
||||
|
@ -40,6 +40,8 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
@ -72,6 +74,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
public class BroadcastReplicationTests extends ESTestCase {
|
||||
|
||||
private static ThreadPool threadPool;
|
||||
private static CircuitBreakerService circuitBreakerService;
|
||||
private ClusterService clusterService;
|
||||
private TransportService transportService;
|
||||
private TestBroadcastReplicationAction broadcastReplicationAction;
|
||||
@ -79,13 +82,14 @@ public class BroadcastReplicationTests extends ESTestCase {
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new ThreadPool("BroadcastReplicationTests");
|
||||
circuitBreakerService = new NoneCircuitBreakerService();
|
||||
}
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
LocalTransport transport = new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry());
|
||||
LocalTransport transport = new LocalTransport(Settings.EMPTY, threadPool, Version.CURRENT, new NamedWriteableRegistry(), circuitBreakerService);
|
||||
clusterService = createClusterService(threadPool);
|
||||
transportService = new TransportService(transport, threadPool);
|
||||
transportService.start();
|
||||
|
@ -267,4 +267,4 @@ public class NodeConnectionsServiceTests extends ESTestCase {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.ToXContent.Params;
|
||||
import org.elasticsearch.http.HttpInfo;
|
||||
import org.elasticsearch.http.HttpServerAdapter;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
@ -43,15 +42,12 @@ import org.elasticsearch.rest.action.cat.AbstractCatAction;
|
||||
import org.elasticsearch.rest.action.cat.RestNodesAction;
|
||||
import org.elasticsearch.rest.action.main.RestMainAction;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.tasks.Task.Status;
|
||||
import org.elasticsearch.test.transport.AssertingLocalTransport;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
||||
public class NetworkModuleTests extends ModuleTestCase {
|
||||
|
||||
static class FakeTransportService extends TransportService {
|
||||
@ -62,7 +58,7 @@ public class NetworkModuleTests extends ModuleTestCase {
|
||||
|
||||
static class FakeTransport extends AssertingLocalTransport {
|
||||
public FakeTransport() {
|
||||
super(null, null, null, null);
|
||||
super(null, null, null, null, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,6 +30,8 @@ import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.discovery.zen.fd.FaultDetection;
|
||||
import org.elasticsearch.discovery.zen.fd.MasterFaultDetection;
|
||||
import org.elasticsearch.discovery.zen.fd.NodesFaultDetection;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
@ -52,6 +54,7 @@ import static org.hamcrest.Matchers.equalTo;
|
||||
public class ZenFaultDetectionTests extends ESTestCase {
|
||||
protected ThreadPool threadPool;
|
||||
protected ClusterService clusterService;
|
||||
private CircuitBreakerService circuitBreakerService;
|
||||
|
||||
protected static final Version version0 = Version.fromId(/*0*/99);
|
||||
protected DiscoveryNode nodeA;
|
||||
@ -67,6 +70,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
|
||||
super.setUp();
|
||||
threadPool = new ThreadPool(getClass().getName());
|
||||
clusterService = createClusterService(threadPool);
|
||||
circuitBreakerService = new NoneCircuitBreakerService();
|
||||
serviceA = build(Settings.builder().put("name", "TS_A").build(), version0);
|
||||
nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
|
||||
serviceB = build(Settings.builder().put("name", "TS_B").build(), version1);
|
||||
@ -112,7 +116,7 @@ public class ZenFaultDetectionTests extends ESTestCase {
|
||||
protected MockTransportService build(Settings settings, Version version) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
MockTransportService transportService = new MockTransportService(Settings.EMPTY,
|
||||
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool);
|
||||
new LocalTransport(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService), threadPool);
|
||||
transportService.start();
|
||||
transportService.acceptIncomingRequests();
|
||||
return transportService;
|
||||
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.discovery.zen.elect.ElectMasterService;
|
||||
import org.elasticsearch.discovery.zen.ping.PingContextProvider;
|
||||
import org.elasticsearch.discovery.zen.ping.ZenPing;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
@ -57,7 +58,7 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
ElectMasterService electMasterService = new ElectMasterService(settings, Version.CURRENT);
|
||||
|
||||
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
|
||||
NettyTransport transportA = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
final TransportService transportServiceA = new TransportService(transportA, threadPool).start();
|
||||
transportServiceA.acceptIncomingRequests();
|
||||
final DiscoveryNode nodeA = new DiscoveryNode("UZP_A", transportServiceA.boundAddress().publishAddress(),
|
||||
@ -65,7 +66,7 @@ public class UnicastZenPingIT extends ESTestCase {
|
||||
|
||||
InetSocketTransportAddress addressA = (InetSocketTransportAddress) transportA.boundAddress().publishAddress();
|
||||
|
||||
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry());
|
||||
NettyTransport transportB = new NettyTransport(settings, threadPool, networkService, BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
final TransportService transportServiceB = new TransportService(transportB, threadPool).start();
|
||||
transportServiceB.acceptIncomingRequests();
|
||||
final DiscoveryNode nodeB = new DiscoveryNode("UZP_B", transportServiceA.boundAddress().publishAddress(),
|
||||
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
@ -73,7 +74,8 @@ public class DynamicMappingDisabledTests extends ESSingleNodeTestCase {
|
||||
.put(MapperService.INDEX_MAPPER_DYNAMIC_SETTING.getKey(), false)
|
||||
.build();
|
||||
clusterService = createClusterService(THREAD_POOL);
|
||||
transport = new LocalTransport(settings, THREAD_POOL, Version.CURRENT, new NamedWriteableRegistry());
|
||||
transport = new LocalTransport(settings, THREAD_POOL, Version.CURRENT, new NamedWriteableRegistry(),
|
||||
new NoneCircuitBreakerService());
|
||||
transportService = new TransportService(transport, THREAD_POOL);
|
||||
indicesService = getInstanceFromNode(IndicesService.class);
|
||||
shardStateAction = new ShardStateAction(settings, clusterService, transportService, null, null, THREAD_POOL);
|
||||
|
@ -19,14 +19,24 @@
|
||||
|
||||
package org.elasticsearch.indices.memory.breaker;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||
import org.elasticsearch.action.bulk.BulkItemResponse;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchRequestBuilder;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.breaker.CircuitBreakingException;
|
||||
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.indices.breaker.BreakerSettings;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
@ -40,6 +50,7 @@ import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@ -50,6 +61,7 @@ import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
|
||||
@ -69,6 +81,9 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
|
||||
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(),
|
||||
HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_LIMIT_SETTING.getDefaultRaw(null))
|
||||
.put(HierarchyCircuitBreakerService.REQUEST_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 1.0)
|
||||
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(),
|
||||
HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getDefaultRaw(null))
|
||||
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 1.0)
|
||||
.build();
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(resetSettings));
|
||||
}
|
||||
@ -87,10 +102,13 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
|
||||
private boolean noopBreakerUsed() {
|
||||
NodesStatsResponse stats = client().admin().cluster().prepareNodesStats().setBreaker(true).get();
|
||||
for (NodeStats nodeStats : stats) {
|
||||
if (nodeStats.getBreaker().getStats(CircuitBreaker.REQUEST).getLimit() == 0) {
|
||||
if (nodeStats.getBreaker().getStats(CircuitBreaker.REQUEST).getLimit() == NoopCircuitBreaker.LIMIT) {
|
||||
return true;
|
||||
}
|
||||
if (nodeStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getLimit() == 0) {
|
||||
if (nodeStats.getBreaker().getStats(CircuitBreaker.IN_FLIGHT_REQUESTS).getLimit() == NoopCircuitBreaker.LIMIT) {
|
||||
return true;
|
||||
}
|
||||
if (nodeStats.getBreaker().getStats(CircuitBreaker.FIELDDATA).getLimit() == NoopCircuitBreaker.LIMIT) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -225,17 +243,19 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
|
||||
fail("should have thrown an exception");
|
||||
} catch (Exception e) {
|
||||
String errMsg = "[fielddata] Data too large, data for [test] would be larger than limit of [10/10b]";
|
||||
assertThat("Exception: " + e.toString() + " should contain a CircuitBreakingException",
|
||||
e.toString().contains(errMsg), equalTo(true));
|
||||
assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException",
|
||||
e.toString(), containsString(errMsg));
|
||||
}
|
||||
|
||||
assertFailures(client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC),
|
||||
RestStatus.INTERNAL_SERVER_ERROR,
|
||||
containsString("Data too large, data for [test] would be larger than limit of [10/10b]"));
|
||||
|
||||
// Adjust settings so the parent breaker will fail, but the fielddata breaker doesn't
|
||||
// Adjust settings so the parent breaker will fail, but neither the fielddata breaker nor the node request breaker will fail
|
||||
// There is no "one size fits all" breaker size as internal request size will vary based on doc count.
|
||||
int parentBreakerSize = docCount * 3;
|
||||
resetSettings = Settings.builder()
|
||||
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "15b")
|
||||
.put(HierarchyCircuitBreakerService.TOTAL_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), parentBreakerSize + "b")
|
||||
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), "90%")
|
||||
.put(HierarchyCircuitBreakerService.FIELDDATA_CIRCUIT_BREAKER_OVERHEAD_SETTING.getKey(), 1.0)
|
||||
.build();
|
||||
@ -246,9 +266,9 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
|
||||
client.prepareSearch("cb-test").setQuery(matchAllQuery()).addSort("test", SortOrder.DESC).get();
|
||||
fail("should have thrown an exception");
|
||||
} catch (Exception e) {
|
||||
String errMsg = "[parent] Data too large, data for [test] would be larger than limit of [15/15b]";
|
||||
assertThat("Exception: " +e.toString() + " should contain a CircuitBreakingException",
|
||||
e.toString().contains(errMsg), equalTo(true));
|
||||
String errMsg = "[parent] Data too large, data for [test] would be larger than limit of [" + parentBreakerSize;
|
||||
assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException",
|
||||
e.toString(), containsString(errMsg));
|
||||
}
|
||||
}
|
||||
|
||||
@ -280,8 +300,8 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
|
||||
fail("aggregation should have tripped the breaker");
|
||||
} catch (Exception e) {
|
||||
String errMsg = "CircuitBreakingException[[request] Data too large, data for [<reused_arrays>] would be larger than limit of [10/10b]]";
|
||||
assertThat("Exception: " + e.toString() + " should contain a CircuitBreakingException",
|
||||
e.toString().contains(errMsg), equalTo(true));
|
||||
assertThat("Exception: [" + e.toString() + "] should contain a CircuitBreakingException",
|
||||
e.toString(), containsString(errMsg));
|
||||
}
|
||||
}
|
||||
|
||||
@ -330,4 +350,70 @@ public class CircuitBreakerServiceIT extends ESIntegTestCase {
|
||||
}
|
||||
assertThat(breaks, greaterThanOrEqualTo(1));
|
||||
}
|
||||
|
||||
public void testLimitsRequestSize() throws Exception {
|
||||
ByteSizeValue inFlightRequestsLimit = new ByteSizeValue(8, ByteSizeUnit.KB);
|
||||
if (noopBreakerUsed()) {
|
||||
logger.info("--> noop breakers used, skipping test");
|
||||
return;
|
||||
}
|
||||
|
||||
internalCluster().ensureAtLeastNumDataNodes(2);
|
||||
|
||||
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
|
||||
List<NodeStats> dataNodeStats = new ArrayList<>();
|
||||
for (NodeStats stat : nodeStats.getNodes()) {
|
||||
if (stat.getNode().isDataNode()) {
|
||||
dataNodeStats.add(stat);
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2));
|
||||
Collections.shuffle(dataNodeStats, random());
|
||||
|
||||
// send bulk request from source node to target node later. The sole shard is bound to the target node.
|
||||
NodeStats targetNode = dataNodeStats.get(0);
|
||||
NodeStats sourceNode = dataNodeStats.get(1);
|
||||
|
||||
assertAcked(prepareCreate("index").setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put("index.routing.allocation.include._name", targetNode.getNode().getName())
|
||||
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)
|
||||
));
|
||||
|
||||
Client client = client(sourceNode.getNode().getName());
|
||||
|
||||
// we use the limit size as a (very) rough indication on how many requests we should sent to hit the limit
|
||||
int numRequests = inFlightRequestsLimit.bytesAsInt();
|
||||
BulkRequest bulkRequest = new BulkRequest();
|
||||
for (int i = 0; i < numRequests; i++) {
|
||||
IndexRequest indexRequest = new IndexRequest("index", "type", Integer.toString(i));
|
||||
indexRequest.source("field", "value", "num", i);
|
||||
bulkRequest.add(indexRequest);
|
||||
}
|
||||
|
||||
Settings limitSettings = Settings.builder()
|
||||
.put(HierarchyCircuitBreakerService.IN_FLIGHT_REQUESTS_CIRCUIT_BREAKER_LIMIT_SETTING.getKey(), inFlightRequestsLimit)
|
||||
.build();
|
||||
|
||||
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(limitSettings));
|
||||
|
||||
// can either fail directly with an exception or the response contains exceptions (depending on client)
|
||||
try {
|
||||
BulkResponse response = client.bulk(bulkRequest).actionGet();
|
||||
if (!response.hasFailures()) {
|
||||
fail("Should have thrown CircuitBreakingException");
|
||||
} else {
|
||||
// each item must have failed with CircuitBreakingException
|
||||
for (BulkItemResponse bulkItemResponse : response) {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(bulkItemResponse.getFailure().getCause());
|
||||
assertThat(cause, instanceOf(CircuitBreakingException.class));
|
||||
assertEquals(((CircuitBreakingException) cause).getByteLimit(), inFlightRequestsLimit.bytes());
|
||||
}
|
||||
}
|
||||
} catch (CircuitBreakingException ex) {
|
||||
assertEquals(ex.getByteLimit(), inFlightRequestsLimit.bytes());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -66,7 +66,7 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
|
||||
threadPool.setClusterSettings(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry());
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
TransportService transportService = new TransportService(nettyTransport, threadPool);
|
||||
nettyTransport.transportServiceAdapter(transportService.createAdapter());
|
||||
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.ModuleTestCase;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.test.transport.AssertingLocalTransport;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
@ -34,8 +35,8 @@ public class TransportModuleTests extends ModuleTestCase {
|
||||
|
||||
static class FakeTransport extends AssertingLocalTransport {
|
||||
@Inject
|
||||
public FakeTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, version, namedWriteableRegistry);
|
||||
public FakeTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, circuitBreakerService, threadPool, version, namedWriteableRegistry);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -25,6 +25,8 @@ import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
@ -53,14 +55,16 @@ public class NettyScheduledPingTests extends ESTestCase {
|
||||
|
||||
Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE.getKey(), "5ms").put(TransportSettings.PORT.getKey(), 0).build();
|
||||
|
||||
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
|
||||
|
||||
NamedWriteableRegistry registryA = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA);
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA, circuitBreakerService);
|
||||
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
|
||||
serviceA.start();
|
||||
serviceA.acceptIncomingRequests();
|
||||
|
||||
NamedWriteableRegistry registryB = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB);
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB, circuitBreakerService);
|
||||
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
|
||||
serviceB.start();
|
||||
serviceB.acceptIncomingRequests();
|
||||
|
@ -23,7 +23,6 @@ import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
@ -31,25 +30,20 @@ import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.node.Node;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.ActionNotFoundTransportException;
|
||||
import org.elasticsearch.transport.RequestHandlerRegistry;
|
||||
import org.elasticsearch.transport.TransportRequest;
|
||||
import org.elasticsearch.transport.TransportSettings;
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
@ -106,8 +100,9 @@ public class NettyTransportIT extends ESIntegTestCase {
|
||||
}
|
||||
|
||||
@Inject
|
||||
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry);
|
||||
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
Version version, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -130,73 +125,20 @@ public class NettyTransportIT extends ESIntegTestCase {
|
||||
pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) {
|
||||
|
||||
@Override
|
||||
protected String handleRequest(Channel channel, StreamInput buffer, long requestId, Version version) throws IOException {
|
||||
final String action = buffer.readString();
|
||||
|
||||
final NettyTransportChannel transportChannel = new NettyTransportChannel(transport, transportServiceAdapter, action, channel, requestId, version, name);
|
||||
try {
|
||||
final RequestHandlerRegistry reg = transportServiceAdapter.getRequestHandler(action);
|
||||
if (reg == null) {
|
||||
throw new ActionNotFoundTransportException(action);
|
||||
}
|
||||
final TransportRequest request = reg.newRequest();
|
||||
request.remoteAddress(new InetSocketTransportAddress((InetSocketAddress) channel.getRemoteAddress()));
|
||||
request.readFrom(buffer);
|
||||
String error = threadPool.getThreadContext().getHeader("ERROR");
|
||||
if (error != null) {
|
||||
throw new ElasticsearchException(error);
|
||||
}
|
||||
if (reg.getExecutor() == ThreadPool.Names.SAME) {
|
||||
//noinspection unchecked
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
} else {
|
||||
threadPool.executor(reg.getExecutor()).execute(new RequestHandler(reg, request, transportChannel));
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (IOException e1) {
|
||||
logger.warn("Failed to send error message back to client for action [{}]", e, action);
|
||||
logger.warn("Actual Exception", e1);
|
||||
}
|
||||
}
|
||||
channelProfileName = transportChannel.getProfileName();
|
||||
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId,
|
||||
int messageLengthBytes, Version version) throws IOException {
|
||||
String action = super.handleRequest(channel, marker, buffer, requestId, messageLengthBytes, version);
|
||||
channelProfileName = this.profileName;
|
||||
return action;
|
||||
}
|
||||
|
||||
class RequestHandler extends AbstractRunnable {
|
||||
private final RequestHandlerRegistry reg;
|
||||
private final TransportRequest request;
|
||||
private final NettyTransportChannel transportChannel;
|
||||
|
||||
public RequestHandler(RequestHandlerRegistry reg, TransportRequest request, NettyTransportChannel transportChannel) {
|
||||
this.reg = reg;
|
||||
this.request = request;
|
||||
this.transportChannel = transportChannel;
|
||||
@Override
|
||||
protected void validateRequest(Marker marker, StreamInput buffer, long requestId, String action) throws IOException {
|
||||
super.validateRequest(marker, buffer, requestId, action);
|
||||
String error = threadPool.getThreadContext().getHeader("ERROR");
|
||||
if (error != null) {
|
||||
throw new ElasticsearchException(error);
|
||||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
reg.processMessageReceived(request, transportChannel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isForceExecution() {
|
||||
return reg.isForceExecution();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable e) {
|
||||
if (transport.lifecycleState() == Lifecycle.State.STARTED) {
|
||||
// we can only send a response transport is started....
|
||||
try {
|
||||
transportChannel.sendResponse(e);
|
||||
} catch (Throwable e1) {
|
||||
logger.warn("Failed to send error message back to client for action [{}]", e1, reg.getAction());
|
||||
logger.warn("Actual Exception", e);
|
||||
}
|
||||
} }
|
||||
}
|
||||
});
|
||||
return pipeline;
|
||||
|
@ -136,7 +136,8 @@ public class NettyTransportMultiPortTests extends ESTestCase {
|
||||
private NettyTransport startNettyTransport(Settings settings, ThreadPool threadPool) {
|
||||
BigArrays bigArrays = new MockBigArrays(new PageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
|
||||
|
||||
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT, new NamedWriteableRegistry());
|
||||
NettyTransport nettyTransport = new NettyTransport(settings, threadPool, new NetworkService(settings), bigArrays, Version.CURRENT,
|
||||
new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
|
||||
assertThat(nettyTransport.lifecycleState(), is(Lifecycle.State.STARTED));
|
||||
|
@ -54,3 +54,24 @@ request) from exceeding a certain amount of memory.
|
||||
A constant that all request estimations are multiplied with to determine a
|
||||
final estimation. Defaults to 1
|
||||
|
||||
[[in-flight-circuit-breaker]]
|
||||
[float]
|
||||
==== In flight requests circuit breaker
|
||||
|
||||
The in flight requests circuit breaker allows Elasticsearch to limit the memory usage of all
|
||||
currently active incoming requests on transport or HTTP level from exceeding a certain amount of
|
||||
memory on a node. The memory usage is based on the content length of the request itself.
|
||||
|
||||
`network.breaker.inflight_requests.limit`::
|
||||
|
||||
Limit for in flight requests breaker, defaults to 100% of JVM heap. This means that it is bound
|
||||
by the limit configured for the parent circuit breaker.
|
||||
|
||||
`network.breaker.inflight_requests.overhead`::
|
||||
|
||||
A constant that all in flight requests estimations are multiplied with to determine a
|
||||
final estimation. Defaults to 1
|
||||
|
||||
[[http-circuit-breaker]]
|
||||
[float]
|
||||
|
||||
|
@ -994,7 +994,6 @@ public final class InternalTestCluster extends TestCluster {
|
||||
assertShardIndexCounter();
|
||||
//check that shards that have same sync id also contain same number of documents
|
||||
assertSameSyncIdSameDocs();
|
||||
|
||||
}
|
||||
|
||||
private void assertSameSyncIdSameDocs() {
|
||||
@ -1852,6 +1851,7 @@ public final class InternalTestCluster extends TestCluster {
|
||||
@Override
|
||||
public void assertAfterTest() throws IOException {
|
||||
super.assertAfterTest();
|
||||
assertRequestsFinished();
|
||||
for (NodeEnvironment env : this.getInstances(NodeEnvironment.class)) {
|
||||
Set<ShardId> shardIds = env.lockedShards();
|
||||
for (ShardId id : shardIds) {
|
||||
@ -1864,6 +1864,27 @@ public final class InternalTestCluster extends TestCluster {
|
||||
}
|
||||
}
|
||||
|
||||
private void assertRequestsFinished() {
|
||||
if (size() > 0) {
|
||||
for (NodeAndClient nodeAndClient : nodes.values()) {
|
||||
CircuitBreaker inFlightRequestsBreaker = getInstance(HierarchyCircuitBreakerService.class, nodeAndClient.name)
|
||||
.getBreaker(CircuitBreaker.IN_FLIGHT_REQUESTS);
|
||||
try {
|
||||
// see #ensureEstimatedStats()
|
||||
assertBusy(() -> {
|
||||
// ensure that our size accounting on transport level is reset properly
|
||||
long bytesUsed = inFlightRequestsBreaker.getUsed();
|
||||
assertThat("All incoming requests on node [" + nodeAndClient.name + "] should have finished. Expected 0 but got " +
|
||||
bytesUsed, bytesUsed, equalTo(0L));
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.error("Could not assert finished requests within timeout", e);
|
||||
fail("Could not assert finished requests within timeout on node [" + nodeAndClient.name + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Simple interface that allows to wait for an async operation to finish
|
||||
* @param <T> the result of the async execution
|
||||
|
@ -28,6 +28,7 @@ import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
@ -79,8 +80,8 @@ public class AssertingLocalTransport extends LocalTransport {
|
||||
private final Version maxVersion;
|
||||
|
||||
@Inject
|
||||
public AssertingLocalTransport(Settings settings, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, version, namedWriteableRegistry);
|
||||
public AssertingLocalTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService);
|
||||
final long seed = ESIntegTestCase.INDEX_TEST_SEED_SETTING.get(settings);
|
||||
random = new Random(seed);
|
||||
minVersion = ASSERTING_TRANSPORT_MIN_VERSION_KEY.get(settings);
|
||||
|
@ -37,6 +37,7 @@ import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.tasks.TaskManager;
|
||||
import org.elasticsearch.test.tasks.MockTaskManager;
|
||||
@ -98,14 +99,14 @@ public class MockTransportService extends TransportService {
|
||||
|
||||
public static MockTransportService local(Settings settings, Version version, ThreadPool threadPool) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry);
|
||||
Transport transport = new LocalTransport(settings, threadPool, version, namedWriteableRegistry, new NoneCircuitBreakerService());
|
||||
return new MockTransportService(settings, transport, threadPool);
|
||||
}
|
||||
|
||||
public static MockTransportService nettyFromThreadPool(Settings settings, Version version, ThreadPool threadPool) {
|
||||
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
|
||||
Transport transport = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE,
|
||||
version, namedWriteableRegistry);
|
||||
version, namedWriteableRegistry, new NoneCircuitBreakerService());
|
||||
return new MockTransportService(Settings.EMPTY, transport, threadPool);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user