Get API: Will always prefer first local execution, regardless of the preference, closes #1153.

This commit is contained in:
Shay Banon 2011-07-24 20:33:49 +03:00
parent b31f68a0eb
commit 5ce42b337d
1 changed files with 48 additions and 66 deletions

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
@ -117,7 +118,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
} }
public void start() { public void start() {
performFirst(); perform(null);
} }
private void onFailure(ShardRouting shardRouting, Exception e) { private void onFailure(ShardRouting shardRouting, Exception e) {
@ -127,50 +128,44 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
perform(e); perform(e);
} }
/** private void perform(@Nullable final Exception lastException) {
* First get should try and use a shard that exists on a local node for better performance final ShardRouting shardRouting = shardIt.nextActiveOrNull();
*/ if (shardRouting == null) {
private void performFirst() { Exception failure = lastException;
while (shardIt.hasNextActive()) { if (failure == null) {
final ShardRouting shard = shardIt.nextActive(); failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request + "]");
if (shard.currentNodeId().equals(nodes.localNodeId())) { } else {
if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to get [{}]", failure, request);
}
}
listener.onFailure(failure);
return;
}
if (shardRouting.currentNodeId().equals(nodes.localNodeId())) {
if (request.operationThreaded()) { if (request.operationThreaded()) {
threadPool.executor(executor).execute(new Runnable() { threadPool.executor(executor).execute(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
Response response = shardOperation(request, shard.id()); Response response = shardOperation(request, shardRouting.id());
listener.onResponse(response); listener.onResponse(response);
} catch (Exception e) { } catch (Exception e) {
onFailure(shard, e); onFailure(shardRouting, e);
} }
} }
}); });
return;
} else { } else {
try { try {
final Response response = shardOperation(request, shard.id()); final Response response = shardOperation(request, shardRouting.id());
listener.onResponse(response); listener.onResponse(response);
return;
} catch (Exception e) { } catch (Exception e) {
onFailure(shard, e); onFailure(shardRouting, e);
} }
} }
} } else {
} DiscoveryNode node = nodes.get(shardRouting.currentNodeId());
if (!shardIt.hasNextActive()) { transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(request, shardRouting.id()), new BaseTransportResponseHandler<Response>() {
// no local node get, go remote
shardIt.reset();
perform(null);
}
}
private void perform(final Exception lastException) {
while (shardIt.hasNextActive()) {
final ShardRouting shard = shardIt.nextActive();
// no need to check for local nodes, we tried them already in performFirstGet
if (!shard.currentNodeId().equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(shard.currentNodeId());
transportService.sendRequest(node, transportShardAction, new ShardSingleOperationRequest(request, shard.id()), new BaseTransportResponseHandler<Response>() {
@Override public Response newInstance() { @Override public Response newInstance() {
return newResponse(); return newResponse();
@ -185,22 +180,9 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
} }
@Override public void handleException(TransportException exp) { @Override public void handleException(TransportException exp) {
onFailure(shard, exp); onFailure(shardRouting, exp);
} }
}); });
return;
}
}
if (!shardIt.hasNextActive()) {
Exception failure = lastException;
if (failure == null) {
failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request + "]");
} else {
if (logger.isDebugEnabled()) {
logger.debug(shardIt.shardId() + ": Failed to get [{}]", failure, request);
}
}
listener.onFailure(failure);
} }
} }
} }