From 2bb31fe74037a7ee2a04c9a994bc4bacbc8e8102 Mon Sep 17 00:00:00 2001 From: kimchy Date: Wed, 7 Apr 2010 01:54:33 +0300 Subject: [PATCH] reafactor how actions handle failures, better response when non active shards exists, also, default logging to have action set to DEBUG so exceptions in actions are logged in the server --- config/logging.yml | 2 + .../NoShardAvailableActionException.java | 4 + .../ping/broadcast/BroadcastPingResponse.java | 4 +- .../TransportBroadcastPingAction.java | 2 +- .../ShardReplicationPingRequest.java | 4 + .../clear/ClearIndicesCacheResponse.java | 4 +- .../TransportClearIndicesCacheAction.java | 8 +- .../admin/indices/flush/FlushResponse.java | 4 +- .../indices/flush/TransportFlushAction.java | 8 +- .../snapshot/ShardGatewaySnapshotRequest.java | 4 + .../indices/optimize/OptimizeResponse.java | 4 +- .../optimize/TransportOptimizeAction.java | 8 +- .../indices/refresh/RefreshResponse.java | 4 +- .../refresh/TransportRefreshAction.java | 8 +- .../indices/status/IndicesStatusResponse.java | 6 +- .../status/TransportIndicesStatusAction.java | 10 +- .../action/count/CountRequest.java | 2 +- .../action/count/CountResponse.java | 4 +- .../action/count/TransportCountAction.java | 2 +- .../ShardDeleteByQueryRequest.java | 6 + .../type/TransportSearchTypeAction.java | 87 ++++++++---- .../broadcast/BroadcastOperationResponse.java | 9 +- ...roadcastShardOperationFailedException.java | 2 +- .../TransportBroadcastOperationAction.java | 81 ++++++++---- .../ShardReplicationOperationRequest.java | 2 +- ...nsportShardReplicationOperationAction.java | 3 + .../TransportSingleOperationAction.java | 48 ++++--- .../action/terms/TermsResponse.java | 4 +- .../action/terms/TransportTermsAction.java | 2 +- .../routing/CompoundShardsIterator.java | 105 --------------- .../cluster/routing/GroupShardsIterator.java | 8 ++ .../cluster/routing/IndexRoutingTable.java | 2 +- .../routing/IndexShardRoutingTable.java | 48 ++++++- .../cluster/routing/PlainShardsIterator.java | 53 +++++++- .../cluster/routing/ShardsIterator.java | 9 ++ .../search/SearchShardTarget.java | 2 +- .../broadcast/BroadcastActionsTests.java | 119 +++++++++++++++++ .../broadcast/BroadcastActionsTests.yml | 7 + .../recovery/SimpleRecoveryTests.java | 37 +++++- .../search/TransportSearchFailuresTests.java | 124 ++++++++++++++++++ .../search/TransportSearchFailuresTests.yml | 9 ++ .../TransportTwoServersSearchTests.java | 8 +- 42 files changed, 628 insertions(+), 239 deletions(-) delete mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/CompoundShardsIterator.java create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/integration/broadcast/BroadcastActionsTests.java create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/integration/broadcast/BroadcastActionsTests.yml create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TransportSearchFailuresTests.java create mode 100644 modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/TransportSearchFailuresTests.yml diff --git a/config/logging.yml b/config/logging.yml index e87252e8ed1..a08a2c224a2 100644 --- a/config/logging.yml +++ b/config/logging.yml @@ -1,6 +1,8 @@ rootLogger: INFO, console, file logger: jgroups: WARN + # log action execution errors for easier debugging + action : DEBUG appender: console: diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java index 41dedc96980..19976e4de6a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java @@ -27,6 +27,10 @@ import org.elasticsearch.index.shard.ShardId; */ public class NoShardAvailableActionException extends IndexShardException { + public NoShardAvailableActionException(ShardId shardId, String msg) { + super(shardId, msg); + } + public NoShardAvailableActionException(ShardId shardId, String msg, Throwable cause) { super(shardId, msg, cause); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/BroadcastPingResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/BroadcastPingResponse.java index 9c99bf5889f..58dc6005901 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/BroadcastPingResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/BroadcastPingResponse.java @@ -36,8 +36,8 @@ public class BroadcastPingResponse extends BroadcastOperationResponse { } - public BroadcastPingResponse(int successfulShards, int failedShards, List shardFailures) { - super(successfulShards, failedShards, shardFailures); + public BroadcastPingResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java index 10f2edf110d..fc281fe5af6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/broadcast/TransportBroadcastPingAction.java @@ -83,7 +83,7 @@ public class TransportBroadcastPingAction extends TransportBroadcastOperationAct successfulShards++; } } - return new BroadcastPingResponse(successfulShards, failedShards, shardFailures); + return new BroadcastPingResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); } @Override protected BroadcastShardPingRequest newShardRequest() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java index 05118dcd28c..e317ef09925 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/ShardReplicationPingRequest.java @@ -58,4 +58,8 @@ public class ShardReplicationPingRequest extends ShardReplicationOperationReques super.writeTo(out); out.writeVInt(shardId); } + + @Override public String toString() { + return "[" + index + "][" + shardId + "]"; + } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheResponse.java index 06acdd2018a..d6038f2269a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheResponse.java @@ -38,8 +38,8 @@ public class ClearIndicesCacheResponse extends BroadcastOperationResponse { } - ClearIndicesCacheResponse(int successfulShards, int failedShards, List shardFailures) { - super(successfulShards, failedShards, shardFailures); + ClearIndicesCacheResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java index 54407b6c14a..6842cbb87f4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/cache/clear/TransportClearIndicesCacheAction.java @@ -66,6 +66,10 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio return new ClearIndicesCacheRequest(); } + @Override protected boolean ignoreNonActiveExceptions() { + return true; + } + @Override protected ClearIndicesCacheResponse newResponse(ClearIndicesCacheRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { int successfulShards = 0; int failedShards = 0; @@ -73,7 +77,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio for (int i = 0; i < shardsResponses.length(); i++) { Object shardResponse = shardsResponses.get(i); if (shardResponse == null) { - failedShards++; + // simply ignore non active shards } else if (shardResponse instanceof BroadcastShardOperationFailedException) { failedShards++; if (shardFailures == null) { @@ -84,7 +88,7 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio successfulShards++; } } - return new ClearIndicesCacheResponse(successfulShards, failedShards, shardFailures); + return new ClearIndicesCacheResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures); } @Override protected ShardClearIndicesCacheRequest newShardRequest() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java index c66197615cb..5208af8e155 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/FlushResponse.java @@ -38,8 +38,8 @@ public class FlushResponse extends BroadcastOperationResponse { } - FlushResponse(int successfulShards, int failedShards, List shardFailures) { - super(successfulShards, failedShards, shardFailures); + FlushResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java index bb4b383b2bc..c7d801b4283 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportFlushAction.java @@ -65,6 +65,10 @@ public class TransportFlushAction extends TransportBroadcastOperationAction shardFailures) { - super(successfulShards, failedShards, shardFailures); + OptimizeResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java index 7468417f8c7..90828298978 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/optimize/TransportOptimizeAction.java @@ -66,6 +66,10 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction shardFailures) { - super(successfulShards, failedShards, shardFailures); + RefreshResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); } @Override public void readFrom(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java index 21331dae70b..0a0bbb2cb41 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportRefreshAction.java @@ -66,6 +66,10 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction shardFailures) { - super(successfulShards, failedShards, shardFailures); + IndicesStatusResponse(ShardStatus[] shards, ClusterState clusterState, int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); this.shards = shards; indicesSettings = newHashMap(); for (ShardStatus shard : shards) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index 65a07600306..59b55d28b95 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicReferenceArray; import static com.google.common.collect.Lists.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class TransportIndicesStatusAction extends TransportBroadcastOperationAction { @@ -65,6 +65,10 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct return new IndicesStatusRequest(); } + @Override protected boolean ignoreNonActiveExceptions() { + return true; + } + @Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) { int successfulShards = 0; int failedShards = 0; @@ -73,7 +77,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct for (int i = 0; i < shardsResponses.length(); i++) { Object shardResponse = shardsResponses.get(i); if (shardResponse == null) { - failedShards++; + // simply ignore non active shards } else if (shardResponse instanceof BroadcastShardOperationFailedException) { failedShards++; if (shardFailures == null) { @@ -85,7 +89,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct successfulShards++; } } - return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, successfulShards, failedShards, shardFailures); + return new IndicesStatusResponse(shards.toArray(new ShardStatus[shards.size()]), clusterState, shardsResponses.length(), successfulShards, failedShards, shardFailures); } @Override protected IndexShardStatusRequest newShardRequest() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java index 76e93fb5c4e..ce45953adc1 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountRequest.java @@ -201,6 +201,6 @@ public class CountRequest extends BroadcastOperationRequest { } @Override public String toString() { - return "[" + Arrays.toString(indices) + "][" + Arrays.toString(types) + "], querySource[" + Unicode.fromBytes(querySource) + "]"; + return "[" + Arrays.toString(indices) + "]" + Arrays.toString(types) + ", querySource[" + Unicode.fromBytes(querySource) + "]"; } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountResponse.java index c562d23db3d..1ab7c19ebcf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/CountResponse.java @@ -40,8 +40,8 @@ public class CountResponse extends BroadcastOperationResponse { } - CountResponse(long count, int successfulShards, int failedShards, List shardFailures) { - super(successfulShards, failedShards, shardFailures); + CountResponse(long count, int totalShards, int successfulShards, int failedShards, List shardFailures) { + super(totalShards, successfulShards, failedShards, shardFailures); this.count = count; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java index 4ff98345803..582abed2c90 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/count/TransportCountAction.java @@ -98,7 +98,7 @@ public class TransportCountAction extends TransportBroadcastOperationAction shardIt) { - final ShardRouting shard = shardIt.next(); - if (!shard.active()) { - // as if we have a "problem", so we iterate to the next one and maintain counts + private void performFirstPhase(final ShardsIterator shardIt) { + final ShardRouting shard = shardIt.nextActiveOrNull(); + if (shard == null) { + // no more active shards... (we should not really get here, but just for safety) onFirstPhaseResult(shard, shardIt, null); } else { Node node = nodes.get(shard.currentNodeId()); @@ -182,13 +182,13 @@ public abstract class TransportSearchTypeAction extends BaseAction shardIt) { + private void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardsIterator shardIt) { processFirstPhaseResult(shard, result); // increment all the "future" shards to update the total ops since we some may work and some may not... // and when that happens, we break on total ops, so we must maintain them - while (shardIt.hasNext()) { + while (shardIt.hasNextActive()) { totalOps.incrementAndGet(); - shardIt.next(); + shardIt.nextActive(); } if (successulOps.incrementAndGet() == expectedSuccessfulOps || totalOps.incrementAndGet() == expectedTotalOps) { @@ -200,15 +200,25 @@ public abstract class TransportSearchTypeAction extends BaseAction shardIt, Throwable t) { - if (logger.isDebugEnabled()) { - if (t != null) { - logger.debug(shard.shortSummary() + ": Failed to search [" + request + "]", t); - } - } + private void onFirstPhaseResult(ShardRouting shard, final ShardsIterator shardIt, Throwable t) { if (totalOps.incrementAndGet() == expectedTotalOps) { + // e is null when there is no next active.... + if (logger.isDebugEnabled()) { + if (t != null) { + if (shard != null) { + logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } + } // no more shards, add a failure - shardFailures.add(new ShardSearchFailure(t)); + if (t == null) { + // no active shards + shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()))); + } else { + shardFailures.add(new ShardSearchFailure(t)); + } if (successulOps.get() == 0) { // no successful ops, raise an exception invokeListener(new SearchPhaseExecutionException(firstPhaseName(), "total failure", buildShardFailures())); @@ -220,11 +230,36 @@ public abstract class TransportSearchTypeAction extends BaseAction shardFailures) { + protected BroadcastOperationResponse(int totalShards, int successfulShards, int failedShards, List shardFailures) { + this.totalShards = totalShards; this.successfulShards = successfulShards; this.failedShards = failedShards; this.shardFailures = shardFailures; @@ -60,7 +63,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse { * The total shards this request ran against. */ public int totalShards() { - return successfulShards + failedShards; + return totalShards; } /** @@ -88,6 +91,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse { } @Override public void readFrom(StreamInput in) throws IOException { + totalShards = in.readVInt(); successfulShards = in.readVInt(); failedShards = in.readVInt(); int size = in.readVInt(); @@ -100,6 +104,7 @@ public abstract class BroadcastOperationResponse implements ActionResponse { } @Override public void writeTo(StreamOutput out) throws IOException { + out.writeVInt(totalShards); out.writeVInt(successfulShards); out.writeVInt(failedShards); out.writeVInt(shardFailures.size()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationFailedException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationFailedException.java index 99e325f13a4..52fc31da7f0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationFailedException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/BroadcastShardOperationFailedException.java @@ -26,7 +26,7 @@ import org.elasticsearch.index.shard.ShardId; /** * An exception indicating that a failure occurred performing an operation on the shard. * - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class BroadcastShardOperationFailedException extends IndexShardException implements ElasticSearchWrapperException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java index 9a75876b49c..585a8270ffd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java @@ -42,7 +42,6 @@ import org.elasticsearch.util.io.stream.Streamable; import org.elasticsearch.util.settings.Settings; import java.io.IOException; -import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -96,10 +95,14 @@ public abstract class TransportBroadcastOperationAction shardIt, boolean localAsync) { - final ShardRouting shard = shardIt.next(); - if (!shard.active()) { - // as if we have a "problem", so we iterate to the next one and maintain counts - onOperation(shard, shardIt, new BroadcastShardOperationFailedException(shard.shardId(), "Not Active"), false); + private void performOperation(final ShardsIterator shardIt, boolean localAsync) { + final ShardRouting shard = shardIt.nextActiveOrNull(); + if (shard == null) { + // no more active shards... (we should not really get here, just safety) + onOperation(shard, shardIt, null, false); } else { final ShardRequest shardRequest = newShardRequest(shard, request); if (shard.currentNodeId().equals(nodes.localNodeId())) { @@ -223,8 +226,8 @@ public abstract class TransportBroadcastOperationAction shardIt, Exception e, boolean alreadyThreaded) { - if (logger.isDebugEnabled()) { - if (e != null) { - logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e); + @SuppressWarnings({"unchecked"}) + private void onOperation(ShardRouting shard, final ShardsIterator shardIt, Throwable t, boolean alreadyThreaded) { + if (!shardIt.hasNextActive()) { + // e is null when there is no next active.... + if (logger.isDebugEnabled()) { + if (t != null) { + if (shard != null) { + logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.debug(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } } - } - if (!shardIt.hasNext()) { - // no more shards in this partition + // no more shards in this group int index = indexCounter.getAndIncrement(); if (accumulateExceptions()) { - if (!(e instanceof BroadcastShardOperationFailedException)) { - e = new BroadcastShardOperationFailedException(shard.shardId(), e); + if (t == null) { + if (!ignoreNonActiveExceptions()) { + t = new BroadcastShardOperationFailedException(shardIt.shardId(), "No active shard(s)"); + } + } else if (!(t instanceof BroadcastShardOperationFailedException)) { + t = new BroadcastShardOperationFailedException(shardIt.shardId(), t); } - shardsResponses.set(index, e); + shardsResponses.set(index, t); } if (expectedOps == counterOps.incrementAndGet()) { finishHim(alreadyThreaded); } return; + } else { + // trace log this exception + if (logger.isTraceEnabled()) { + if (t != null) { + if (shard != null) { + logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t); + } else { + logger.trace(shardIt.shardId() + ": Failed to execute [" + request + "]", t); + } + } + } } // we are not threaded here if we got here from the transport // or we possibly threaded if we got from a local threaded one, diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java index 28a5a17ba0c..86fa6102842 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java @@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.Actions.*; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public abstract class ShardReplicationOperationRequest implements ActionRequest { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 27d245fd6a1..b50c02e9dca 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -330,6 +330,9 @@ public abstract class TransportShardReplicationOperationAction extends BaseAction { @@ -83,9 +82,7 @@ public abstract class TransportSingleOperationAction listener; - private final ShardsIterator shards; - - private Iterator shardsIt; + private final ShardsIterator shardsIt; private final Request request; @@ -102,18 +99,17 @@ public abstract class TransportSingleOperationAction shardFailures, FieldTermsFreq[] fieldsTermsFreq, + TermsResponse(int totalShards, int successfulShards, int failedShards, List shardFailures, FieldTermsFreq[] fieldsTermsFreq, long numDocs, long maxDoc, long numDeletedDocs) { - super(successfulShards, failedShards, shardFailures); + super(totalShards, successfulShards, failedShards, shardFailures); this.fieldsTermsFreq = fieldsTermsFreq; this.numDocs = numDocs; this.maxDoc = maxDoc; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TransportTermsAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TransportTermsAction.java index ad119802090..7844209048d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TransportTermsAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/terms/TransportTermsAction.java @@ -140,7 +140,7 @@ public class TransportTermsAction extends TransportBroadcastOperationAction { - - private int index = 0; - - private final List iterators; - - private Iterator current; - - public CompoundShardsIterator(List iterators) { - this.iterators = iterators; - } - - @Override public ShardsIterator reset() { - for (ShardsIterator it : iterators) { - it.reset(); - } - index = 0; - current = null; - return this; - } - - @Override public int size() { - int size = 0; - for (ShardsIterator it : iterators) { - size += it.size(); - } - return size; - } - - @Override public boolean hasNext() { - if (index == iterators.size()) { - return false; - } - if (current == null) { - current = iterators.get(index).iterator(); - } - while (!current.hasNext()) { - if (++index == iterators.size()) { - return false; - } - current = iterators.get(index).iterator(); - } - return true; - } - - @Override public ShardRouting next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - return current.next(); - } - - @Override public void remove() { - throw new UnsupportedOperationException(); - } - - @Override public ShardId shardId() { - return currentShardsIterator().shardId(); - } - - @Override public Iterator iterator() { - return this; - } - - private ShardsIterator currentShardsIterator() throws NoSuchElementException { - if (iterators.size() == 0) { - throw new NoSuchElementException(); - } - if (index == iterators.size()) { - return iterators.get(index - 1); - } - return iterators.get(index); - - } -} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java index b8dae204a92..40d4d305a3d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java @@ -57,6 +57,14 @@ public class GroupShardsIterator implements Iterable { return size; } + public int totalSizeActive() { + int size = 0; + for (ShardsIterator shard : iterators) { + size += shard.sizeActive(); + } + return size; + } + public int size() { return iterators.size(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java index 102e823a5c4..b569ffe5da9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexRoutingTable.java @@ -110,7 +110,7 @@ public class IndexRoutingTable implements Iterable { * A groups shards iterator where each groups is a single {@link ShardRouting} and a group * is created for each shard routing. * - *

This basically means that components that use the {@link GroupShardsIterator} will itearte + *

This basically means that components that use the {@link GroupShardsIterator} will iterate * over *all* the shards (all the replicas) within the index. */ public GroupShardsIterator groupByAllIt() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 4cc30ef4a49..7f1d0e4c94b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -106,7 +106,7 @@ public class IndexShardRoutingTable implements Iterable { *

The class can be used from different threads, though not designed to be used concurrently * from different threads. */ - private class IndexShardsIterator implements ShardsIterator, Iterator { + class IndexShardsIterator implements ShardsIterator, Iterator { private final int origIndex; @@ -130,17 +130,47 @@ public class IndexShardRoutingTable implements Iterable { } @Override public boolean hasNext() { - return counter != size(); + return counter < size(); } - @Override public ShardRouting next() { + @Override public ShardRouting next() throws NoSuchElementException { if (!hasNext()) { - throw new NoSuchElementException(); + throw new NoSuchElementException("No shard found"); } counter++; return shardModulo(index++); } + @Override public boolean hasNextActive() { + int counter = this.counter; + int index = this.index; + while (counter++ < size()) { + ShardRouting shardRouting = shardModulo(index++); + if (shardRouting.active()) { + return true; + } + } + return false; + } + + @Override public ShardRouting nextActive() throws NoSuchElementException { + ShardRouting shardRouting = nextActiveOrNull(); + if (shardRouting == null) { + throw new NoSuchElementException("No active shard found"); + } + return shardRouting; + } + + @Override public ShardRouting nextActiveOrNull() throws NoSuchElementException { + while (counter++ < size()) { + ShardRouting shardRouting = shardModulo(index++); + if (shardRouting.active()) { + return shardRouting; + } + } + return null; + } + @Override public void remove() { throw new UnsupportedOperationException(); } @@ -149,6 +179,16 @@ public class IndexShardRoutingTable implements Iterable { return IndexShardRoutingTable.this.size(); } + @Override public int sizeActive() { + int shardsActive = 0; + for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) { + if (shardRouting.active()) { + shardsActive++; + } + } + return shardsActive; + } + @Override public ShardId shardId() { return IndexShardRoutingTable.this.shardId(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java index dbe837f703c..59cb297c803 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java @@ -23,9 +23,10 @@ import org.elasticsearch.index.shard.ShardId; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class PlainShardsIterator implements ShardsIterator { @@ -33,16 +34,15 @@ public class PlainShardsIterator implements ShardsIterator { private final List shards; - private Iterator iterator; + private volatile int counter = 0; public PlainShardsIterator(ShardId shardId, List shards) { this.shardId = shardId; this.shards = shards; - this.iterator = shards.iterator(); } @Override public ShardsIterator reset() { - this.iterator = shards.iterator(); + this.counter = 0; return this; } @@ -50,6 +50,16 @@ public class PlainShardsIterator implements ShardsIterator { return shards.size(); } + @Override public int sizeActive() { + int sizeActive = 0; + for (ShardRouting shardRouting : shards) { + if (shardRouting.active()) { + sizeActive++; + } + } + return sizeActive; + } + @Override public ShardId shardId() { return this.shardId; } @@ -59,11 +69,42 @@ public class PlainShardsIterator implements ShardsIterator { } @Override public boolean hasNext() { - return iterator.hasNext(); + return counter < shards.size(); } @Override public ShardRouting next() { - return iterator.next(); + if (!hasNext()) { + throw new NoSuchElementException("No shard found"); + } + return shards.get(counter++); + } + + @Override public boolean hasNextActive() { + int counter = this.counter; + while (counter < shards.size()) { + if (shards.get(counter++).active()) { + return true; + } + } + return false; + } + + @Override public ShardRouting nextActive() throws NoSuchElementException { + ShardRouting shardRouting = nextActiveOrNull(); + if (shardRouting == null) { + throw new NoSuchElementException("No active shard found"); + } + return shardRouting; + } + + @Override public ShardRouting nextActiveOrNull() throws NoSuchElementException { + while (counter < shards.size()) { + ShardRouting shardRouting = shards.get(counter++); + if (shardRouting.active()) { + return shardRouting; + } + } + return null; } @Override public void remove() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java index 9a372a2854d..ab2e1409742 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java @@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.index.shard.ShardId; import java.util.Iterator; +import java.util.NoSuchElementException; /** * @author kimchy (Shay Banon) @@ -35,5 +36,13 @@ public interface ShardsIterator extends Iterable, Iterator