add ability to iterate over assigned shards, and add an extension point to control shard routing iteration in the broadcast based action support

This commit is contained in:
kimchy 2010-08-17 14:04:12 +03:00
parent 7833cb1c76
commit 96fc16dddb
4 changed files with 191 additions and 31 deletions

View File

@ -95,6 +95,26 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract GroupShardsIterator shards(Request request, ClusterState clusterState); protected abstract GroupShardsIterator shards(Request request, ClusterState clusterState);
/**
* Allows to override how shard routing is iterated over. Default implementation uses
* {@link ShardsIterator#nextActiveOrNull()}.
*
* <p>Note, if overriding this method, make sure to also override {@link #hasNextShard(org.elasticsearch.cluster.routing.ShardsIterator)}.
*/
protected ShardRouting nextShardOrNull(ShardsIterator shardIt) {
return shardIt.nextActiveOrNull();
}
/**
* Allows to override how shard routing is iterated over. Default implementation uses
* {@link ShardsIterator#hasNextActive()}.
*
* <p>Note, if overriding this method, make sure to also override {@link #nextShardOrNull(org.elasticsearch.cluster.routing.ShardsIterator)}.
*/
protected boolean hasNextShard(ShardsIterator shardIt) {
return shardIt.hasNextActive();
}
protected boolean accumulateExceptions() { protected boolean accumulateExceptions() {
return true; return true;
} }
@ -153,7 +173,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// count the local operations, and perform the non local ones // count the local operations, and perform the non local ones
int localOperations = 0; int localOperations = 0;
for (final ShardsIterator shardIt : shardsIts) { for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.nextActiveOrNull(); final ShardRouting shard = nextShardOrNull(shardIt);
if (shard != null) { if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) { if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++; localOperations++;
@ -173,7 +193,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@Override public void run() { @Override public void run() {
for (final ShardsIterator shardIt : shardsIts) { for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull(); final ShardRouting shard = nextShardOrNull(shardIt.reset());
if (shard != null) { if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) { if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt.reset(), false); performOperation(shardIt.reset(), false);
@ -188,7 +208,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
request.beforeLocalFork(); request.beforeLocalFork();
} }
for (final ShardsIterator shardIt : shardsIts) { for (final ShardsIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.reset().nextActiveOrNull(); final ShardRouting shard = nextShardOrNull(shardIt.reset());
if (shard != null) { if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) { if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt.reset(), localAsync); performOperation(shardIt.reset(), localAsync);
@ -200,7 +220,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
} }
private void performOperation(final ShardsIterator shardIt, boolean localAsync) { private void performOperation(final ShardsIterator shardIt, boolean localAsync) {
final ShardRouting shard = shardIt.nextActiveOrNull(); final ShardRouting shard = nextShardOrNull(shardIt);
if (shard == null) { if (shard == null) {
// no more active shards... (we should not really get here, just safety) // no more active shards... (we should not really get here, just safety)
onOperation(shard, shardIt, null, false); onOperation(shard, shardIt, null, false);
@ -263,7 +283,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
@SuppressWarnings({"unchecked"}) @SuppressWarnings({"unchecked"})
private void onOperation(ShardRouting shard, final ShardsIterator shardIt, Throwable t, boolean alreadyThreaded) { private void onOperation(ShardRouting shard, final ShardsIterator shardIt, Throwable t, boolean alreadyThreaded) {
if (!shardIt.hasNextActive()) { if (!hasNextShard(shardIt)) {
// e is null when there is no next active.... // e is null when there is no next active....
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
if (t != null) { if (t != null) {

View File

@ -164,6 +164,24 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return shardModulo(index++); return shardModulo(index++);
} }
@Override public void remove() {
throw new UnsupportedOperationException();
}
@Override public int size() {
return IndexShardRoutingTable.this.size();
}
@Override public int sizeActive() {
int shardsActive = 0;
for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) {
if (shardRouting.active()) {
shardsActive++;
}
}
return shardsActive;
}
@Override public boolean hasNextActive() { @Override public boolean hasNextActive() {
int counter = this.counter; int counter = this.counter;
int index = this.index; int index = this.index;
@ -200,22 +218,50 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return null; return null;
} }
@Override public void remove() { @Override public int sizeAssigned() {
throw new UnsupportedOperationException(); int shardsAssigned = 0;
}
@Override public int size() {
return IndexShardRoutingTable.this.size();
}
@Override public int sizeActive() {
int shardsActive = 0;
for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) { for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) {
if (shardRouting.active()) { if (shardRouting.assignedToNode()) {
shardsActive++; shardsAssigned++;
} }
} }
return shardsActive; return shardsAssigned;
}
@Override public boolean hasNextAssigned() {
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
return true;
}
}
return false;
}
@Override public ShardRouting nextAssigned() throws NoSuchElementException {
ShardRouting shardRouting = nextAssignedOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No assigned shard found");
}
return shardRouting;
}
@Override public ShardRouting nextAssignedOrNull() {
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
this.counter = counter;
this.index = index;
return shardRouting;
}
}
this.counter = counter;
this.index = index;
return null;
} }
@Override public ShardId shardId() { @Override public ShardId shardId() {

View File

@ -50,16 +50,6 @@ public class PlainShardsIterator implements ShardsIterator {
return shards.size(); return shards.size();
} }
@Override public int sizeActive() {
int sizeActive = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.active()) {
sizeActive++;
}
}
return sizeActive;
}
@Override public ShardId shardId() { @Override public ShardId shardId() {
return this.shardId; return this.shardId;
} }
@ -79,6 +69,16 @@ public class PlainShardsIterator implements ShardsIterator {
return shards.get(counter++); return shards.get(counter++);
} }
@Override public int sizeActive() {
int sizeActive = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.active()) {
sizeActive++;
}
}
return sizeActive;
}
@Override public boolean hasNextActive() { @Override public boolean hasNextActive() {
int counter = this.counter; int counter = this.counter;
while (counter < shards.size()) { while (counter < shards.size()) {
@ -107,6 +107,44 @@ public class PlainShardsIterator implements ShardsIterator {
return null; return null;
} }
@Override public int sizeAssigned() {
int sizeAssigned = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.assignedToNode()) {
sizeAssigned++;
}
}
return sizeAssigned;
}
@Override public boolean hasNextAssigned() {
int counter = this.counter;
while (counter < shards.size()) {
if (shards.get(counter++).assignedToNode()) {
return true;
}
}
return false;
}
@Override public ShardRouting nextAssigned() throws NoSuchElementException {
ShardRouting shardRouting = nextAssignedOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No assigned shard found");
}
return shardRouting;
}
@Override public ShardRouting nextAssignedOrNull() {
while (counter < shards.size()) {
ShardRouting shardRouting = shards.get(counter++);
if (shardRouting.assignedToNode()) {
return shardRouting;
}
}
return null;
}
@Override public void remove() { @Override public void remove() {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -25,24 +25,80 @@ import java.util.Iterator;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
/** /**
* @author kimchy (Shay Banon) * Allows to iterate over a set of shard instances (routing) within a shard id group.
*
* @author kimchy (shay.banon)
*/ */
public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRouting> { public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRouting> {
/**
* The shard id this group relates to.
*/
ShardId shardId();
/** /**
* Resets the iterator. * Resets the iterator.
*/ */
ShardsIterator reset(); ShardsIterator reset();
/**
* The number of shard routing instances.
*/
int size(); int size();
/**
* The number of active shard routing instances.
*
* @see ShardRouting#active()
*/
int sizeActive(); int sizeActive();
ShardId shardId(); /**
* Is there an active shard we can iterate to.
*
* @see ShardRouting#active()
*/
boolean hasNextActive(); boolean hasNextActive();
/**
* Returns the next active shard, or throws {@link NoSuchElementException}.
*
* @see ShardRouting#active()
*/
ShardRouting nextActive() throws NoSuchElementException; ShardRouting nextActive() throws NoSuchElementException;
/**
* Returns the next active shard, or <tt>null</tt>.
*
* @see ShardRouting#active()
*/
ShardRouting nextActiveOrNull(); ShardRouting nextActiveOrNull();
/**
* The number of assigned shard routing instances.
*
* @see ShardRouting#assignedToNode()
*/
int sizeAssigned();
/**
* Is there an assigned shard we can iterate to.
*
* @see ShardRouting#assignedToNode()
*/
boolean hasNextAssigned();
/**
* Returns the next assigned shard, or throws {@link NoSuchElementException}.
*
* @see ShardRouting#assignedToNode()
*/
ShardRouting nextAssigned() throws NoSuchElementException;
/**
* Returns the next assigned shard, or <tt>null</tt>.
*
* @see ShardRouting#assignedToNode()
*/
ShardRouting nextAssignedOrNull();
} }