diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java index c0653f61185..84813c01c64 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/broadcast/TransportBroadcastOperationAction.java @@ -95,6 +95,26 @@ public abstract class TransportBroadcastOperationActionNote, if overriding this method, make sure to also override {@link #hasNextShard(org.elasticsearch.cluster.routing.ShardsIterator)}. + */ + protected ShardRouting nextShardOrNull(ShardsIterator shardIt) { + return shardIt.nextActiveOrNull(); + } + + /** + * Allows to override how shard routing is iterated over. Default implementation uses + * {@link ShardsIterator#hasNextActive()}. + * + *

Note, if overriding this method, make sure to also override {@link #nextShardOrNull(org.elasticsearch.cluster.routing.ShardsIterator)}. + */ + protected boolean hasNextShard(ShardsIterator shardIt) { + return shardIt.hasNextActive(); + } + protected boolean accumulateExceptions() { return true; } @@ -153,7 +173,7 @@ public abstract class TransportBroadcastOperationAction { return shardModulo(index++); } + @Override public void remove() { + throw new UnsupportedOperationException(); + } + + @Override public int size() { + return IndexShardRoutingTable.this.size(); + } + + @Override public int sizeActive() { + int shardsActive = 0; + for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) { + if (shardRouting.active()) { + shardsActive++; + } + } + return shardsActive; + } + @Override public boolean hasNextActive() { int counter = this.counter; int index = this.index; @@ -200,22 +218,50 @@ public class IndexShardRoutingTable implements Iterable { return null; } - @Override public void remove() { - throw new UnsupportedOperationException(); - } - - @Override public int size() { - return IndexShardRoutingTable.this.size(); - } - - @Override public int sizeActive() { - int shardsActive = 0; + @Override public int sizeAssigned() { + int shardsAssigned = 0; for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) { - if (shardRouting.active()) { - shardsActive++; + if (shardRouting.assignedToNode()) { + shardsAssigned++; } } - return shardsActive; + return shardsAssigned; + } + + @Override public boolean hasNextAssigned() { + int counter = this.counter; + int index = this.index; + while (counter++ < size()) { + ShardRouting shardRouting = shardModulo(index++); + if (shardRouting.assignedToNode()) { + return true; + } + } + return false; + } + + @Override public ShardRouting nextAssigned() throws NoSuchElementException { + ShardRouting shardRouting = nextAssignedOrNull(); + if (shardRouting == null) { + throw new NoSuchElementException("No assigned shard found"); + } + return shardRouting; + } + + @Override public ShardRouting nextAssignedOrNull() { + int counter = this.counter; + int index = this.index; + while (counter++ < size()) { + ShardRouting shardRouting = shardModulo(index++); + if (shardRouting.assignedToNode()) { + this.counter = counter; + this.index = index; + return shardRouting; + } + } + this.counter = counter; + this.index = index; + return null; } @Override public ShardId shardId() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java index 59cb297c803..f7a511a188b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/PlainShardsIterator.java @@ -50,16 +50,6 @@ public class PlainShardsIterator implements ShardsIterator { return shards.size(); } - @Override public int sizeActive() { - int sizeActive = 0; - for (ShardRouting shardRouting : shards) { - if (shardRouting.active()) { - sizeActive++; - } - } - return sizeActive; - } - @Override public ShardId shardId() { return this.shardId; } @@ -79,6 +69,16 @@ public class PlainShardsIterator implements ShardsIterator { return shards.get(counter++); } + @Override public int sizeActive() { + int sizeActive = 0; + for (ShardRouting shardRouting : shards) { + if (shardRouting.active()) { + sizeActive++; + } + } + return sizeActive; + } + @Override public boolean hasNextActive() { int counter = this.counter; while (counter < shards.size()) { @@ -107,6 +107,44 @@ public class PlainShardsIterator implements ShardsIterator { return null; } + @Override public int sizeAssigned() { + int sizeAssigned = 0; + for (ShardRouting shardRouting : shards) { + if (shardRouting.assignedToNode()) { + sizeAssigned++; + } + } + return sizeAssigned; + } + + @Override public boolean hasNextAssigned() { + int counter = this.counter; + while (counter < shards.size()) { + if (shards.get(counter++).assignedToNode()) { + return true; + } + } + return false; + } + + @Override public ShardRouting nextAssigned() throws NoSuchElementException { + ShardRouting shardRouting = nextAssignedOrNull(); + if (shardRouting == null) { + throw new NoSuchElementException("No assigned shard found"); + } + return shardRouting; + } + + @Override public ShardRouting nextAssignedOrNull() { + while (counter < shards.size()) { + ShardRouting shardRouting = shards.get(counter++); + if (shardRouting.assignedToNode()) { + return shardRouting; + } + } + return null; + } + @Override public void remove() { throw new UnsupportedOperationException(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java index ab2e1409742..3e2ba46a34b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/ShardsIterator.java @@ -25,24 +25,80 @@ import java.util.Iterator; import java.util.NoSuchElementException; /** - * @author kimchy (Shay Banon) + * Allows to iterate over a set of shard instances (routing) within a shard id group. + * + * @author kimchy (shay.banon) */ public interface ShardsIterator extends Iterable, Iterator { + /** + * The shard id this group relates to. + */ + ShardId shardId(); + /** * Resets the iterator. */ ShardsIterator reset(); + /** + * The number of shard routing instances. + */ int size(); + /** + * The number of active shard routing instances. + * + * @see ShardRouting#active() + */ int sizeActive(); - ShardId shardId(); - + /** + * Is there an active shard we can iterate to. + * + * @see ShardRouting#active() + */ boolean hasNextActive(); + /** + * Returns the next active shard, or throws {@link NoSuchElementException}. + * + * @see ShardRouting#active() + */ ShardRouting nextActive() throws NoSuchElementException; + /** + * Returns the next active shard, or null. + * + * @see ShardRouting#active() + */ ShardRouting nextActiveOrNull(); + + /** + * The number of assigned shard routing instances. + * + * @see ShardRouting#assignedToNode() + */ + int sizeAssigned(); + + /** + * Is there an assigned shard we can iterate to. + * + * @see ShardRouting#assignedToNode() + */ + boolean hasNextAssigned(); + + /** + * Returns the next assigned shard, or throws {@link NoSuchElementException}. + * + * @see ShardRouting#assignedToNode() + */ + ShardRouting nextAssigned() throws NoSuchElementException; + + /** + * Returns the next assigned shard, or null. + * + * @see ShardRouting#assignedToNode() + */ + ShardRouting nextAssignedOrNull(); }