Settings queue_size on index/bulk TP can cause rejection failures when executed over network

The #3526 fix was not complete, it handled cases of on node execution, but didn't properly handle cases where it was executed over the network, and forcing the execution of the replica operation when done over the wire.

This relates to #3854

closes #3929
This commit is contained in:
Shay Banon 2013-10-17 17:06:34 +03:00
parent adf0c8424b
commit c9b0e1de6c
6 changed files with 59 additions and 5 deletions

View File

@ -234,6 +234,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
return executor;
}
// we must never reject on because of thread pool capacity on replicas
@Override
public boolean isForceExecution() {
return true;
}
@Override
public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
shardOperationOnReplica(request);

View File

@ -99,7 +99,7 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
void onFailure(Throwable e);
}
class AllocateDangledRequestHandler implements TransportRequestHandler<AllocateDangledRequest> {
class AllocateDangledRequestHandler extends BaseTransportRequestHandler<AllocateDangledRequest> {
public static final String ACTION = "/gateway/local/allocate_dangled";

View File

@ -24,4 +24,11 @@ package org.elasticsearch.transport;
*/
public abstract class BaseTransportRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {
/**
* Default force execution to false.
*/
@Override
public boolean isForceExecution() {
return false;
}
}

View File

@ -29,4 +29,9 @@ public interface TransportRequestHandler<T extends TransportRequest> {
void messageReceived(T request, TransportChannel channel) throws Exception;
String executor();
/**
* See {@link org.elasticsearch.common.util.concurrent.AbstractRunnable#isForceExecution()}.
*/
boolean isForceExecution();
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.*;
@ -31,6 +32,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
import org.elasticsearch.common.transport.LocalTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus;
@ -186,7 +188,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
return this.threadPool;
}
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {
protected void messageReceived(byte[] data, String action, LocalTransport sourceTransport, Version version, @Nullable final Long sendRequestId) {
try {
transportServiceAdapter.received(data.length);
StreamInput stream = new BytesStreamInput(data, false);
@ -232,7 +234,35 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
final TransportRequest request = handler.newInstance();
request.readFrom(stream);
handler.messageReceived(request, transportChannel);
if (handler.executor() == ThreadPool.Names.SAME) {
//noinspection unchecked
handler.messageReceived(request, transportChannel);
} else {
threadPool.executor(handler.executor()).execute(new AbstractRunnable() {
@Override
public void run() {
try {
//noinspection unchecked
handler.messageReceived(request, transportChannel);
} catch (Throwable e) {
if (lifecycleState() == Lifecycle.State.STARTED) {
// we can only send a response transport is started....
try {
transportChannel.sendResponse(e);
} catch (Throwable e1) {
logger.warn("Failed to send error message back to client for action [" + action + "]", e1);
logger.warn("Actual Exception", e);
}
}
}
}
@Override
public boolean isForceExecution() {
return handler.isForceExecution();
}
});
}
} catch (Throwable e) {
try {
transportChannel.sendResponse(e);
@ -254,7 +284,7 @@ public class LocalTransport extends AbstractLifecycleComponent<Transport> implem
}
handleParsedRespone(response, handler);
}
protected void handleParsedRespone(final TransportResponse response, final TransportResponseHandler handler) {
threadPool.executor(handler.executor()).execute(new Runnable() {
@SuppressWarnings({"unchecked"})

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.ThrowableObjectInputStream;
import org.elasticsearch.common.io.stream.CachedStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.support.TransportStatus;
@ -249,7 +250,7 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
}
class RequestHandler implements Runnable {
class RequestHandler extends AbstractRunnable {
private final TransportRequestHandler handler;
private final TransportRequest request;
private final NettyTransportChannel transportChannel;
@ -279,5 +280,10 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
}
}
}
@Override
public boolean isForceExecution() {
return handler.isForceExecution();
}
}
}