more cleaning of shards iterators

This commit is contained in:
kimchy 2010-11-21 14:49:57 +02:00
parent e183fbd6ad
commit 8689e5cf16
3 changed files with 68 additions and 199 deletions

View File

@ -28,9 +28,7 @@ import org.elasticsearch.common.util.concurrent.jsr166y.ThreadLocalRandom;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.collect.Lists.*;
@ -91,11 +89,11 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
public ShardIterator shardsIt() {
return new IndexShardIterator(0);
return new PlainShardIterator(shardId, shards);
}
public ShardIterator shardsRandomIt() {
return new IndexShardIterator(Math.abs(nextCounter()));
return new PlainShardIterator(shardId, shards, counter.getAndIncrement());
}
public ShardRouting primaryShard() {
@ -129,172 +127,6 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return shards;
}
int nextCounter() {
return counter.getAndIncrement();
}
ShardRouting shardModulo(int counter) {
return shards.get((counter % size()));
}
/**
* <p>The class can be used from different threads, though not designed to be used concurrently
* from different threads.
*/
class IndexShardIterator implements ShardIterator, Iterator<ShardRouting> {
private final int origIndex;
private volatile int index;
private volatile int counter = 0;
private IndexShardIterator(int index) {
this.origIndex = index;
this.index = index;
}
@Override public Iterator<ShardRouting> iterator() {
return this;
}
@Override public ShardIterator reset() {
counter = 0;
index = origIndex;
return this;
}
@Override public boolean hasNext() {
return counter < size();
}
@Override public ShardRouting next() throws NoSuchElementException {
if (!hasNext()) {
throw new NoSuchElementException("No shard found");
}
counter++;
return shardModulo(index++);
}
@Override public void remove() {
throw new UnsupportedOperationException();
}
@Override public int size() {
return IndexShardRoutingTable.this.size();
}
@Override public int sizeActive() {
int shardsActive = 0;
for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) {
if (shardRouting.active()) {
shardsActive++;
}
}
return shardsActive;
}
@Override public boolean hasNextActive() {
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
return true;
}
}
return false;
}
@Override public ShardRouting nextActive() throws NoSuchElementException {
ShardRouting shardRouting = nextActiveOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No active shard found");
}
return shardRouting;
}
@Override public ShardRouting nextActiveOrNull() throws NoSuchElementException {
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
this.counter = counter;
this.index = index;
return shardRouting;
}
}
this.counter = counter;
this.index = index;
return null;
}
@Override public int sizeAssigned() {
int shardsAssigned = 0;
for (ShardRouting shardRouting : IndexShardRoutingTable.this.shards()) {
if (shardRouting.assignedToNode()) {
shardsAssigned++;
}
}
return shardsAssigned;
}
@Override public boolean hasNextAssigned() {
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
return true;
}
}
return false;
}
@Override public ShardRouting nextAssigned() throws NoSuchElementException {
ShardRouting shardRouting = nextAssignedOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No assigned shard found");
}
return shardRouting;
}
@Override public ShardRouting nextAssignedOrNull() {
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
this.counter = counter;
this.index = index;
return shardRouting;
}
}
this.counter = counter;
this.index = index;
return null;
}
@Override public ShardId shardId() {
return IndexShardRoutingTable.this.shardId();
}
@Override public boolean equals(Object o) {
if (this == o) return true;
ShardIterator that = (ShardIterator) o;
if (shardId != null ? !shardId.equals(that.shardId()) : that.shardId() != null) return false;
return true;
}
@Override public int hashCode() {
return shardId != null ? shardId.hashCode() : 0;
}
}
public static class Builder {
private ShardId shardId;

View File

@ -35,6 +35,11 @@ public class PlainShardIterator extends PlainShardsIterator implements ShardIter
this.shardId = shardId;
}
public PlainShardIterator(ShardId shardId, List<ShardRouting> shards, int index) {
super(shards, index);
this.shardId = shardId;
}
@Override public ShardIterator reset() {
super.reset();
return this;

View File

@ -30,50 +30,68 @@ public class PlainShardsIterator implements ShardsIterator {
protected final List<ShardRouting> shards;
private final int origIndex;
private volatile int index;
private volatile int counter = 0;
public PlainShardsIterator(List<ShardRouting> shards) {
this(shards, 0);
}
public PlainShardsIterator(List<ShardRouting> shards, int index) {
this.shards = shards;
}
@Override public ShardsIterator reset() {
this.counter = 0;
return this;
}
@Override public int size() {
return shards.size();
this.index = Math.abs(index);
this.origIndex = this.index;
}
@Override public Iterator<ShardRouting> iterator() {
return this;
}
@Override public boolean hasNext() {
return counter < shards.size();
@Override public ShardsIterator reset() {
counter = 0;
index = origIndex;
return this;
}
@Override public ShardRouting next() {
@Override public boolean hasNext() {
return counter < size();
}
@Override public ShardRouting next() throws NoSuchElementException {
if (!hasNext()) {
throw new NoSuchElementException("No shard found");
}
return shards.get(counter++);
counter++;
return shardModulo(index++);
}
@Override public void remove() {
throw new UnsupportedOperationException();
}
@Override public int size() {
return shards.size();
}
@Override public int sizeActive() {
int sizeActive = 0;
int shardsActive = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.active()) {
sizeActive++;
shardsActive++;
}
}
return sizeActive;
return shardsActive;
}
@Override public boolean hasNextActive() {
int counter = this.counter;
while (counter < shards.size()) {
if (shards.get(counter++).active()) {
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
return true;
}
}
@ -89,29 +107,37 @@ public class PlainShardsIterator implements ShardsIterator {
}
@Override public ShardRouting nextActiveOrNull() throws NoSuchElementException {
while (counter < shards.size()) {
ShardRouting shardRouting = shards.get(counter++);
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
this.counter = counter;
this.index = index;
return shardRouting;
}
}
this.counter = counter;
this.index = index;
return null;
}
@Override public int sizeAssigned() {
int sizeAssigned = 0;
int shardsAssigned = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.assignedToNode()) {
sizeAssigned++;
shardsAssigned++;
}
}
return sizeAssigned;
return shardsAssigned;
}
@Override public boolean hasNextAssigned() {
int counter = this.counter;
while (counter < shards.size()) {
if (shards.get(counter++).assignedToNode()) {
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
return true;
}
}
@ -127,16 +153,22 @@ public class PlainShardsIterator implements ShardsIterator {
}
@Override public ShardRouting nextAssignedOrNull() {
while (counter < shards.size()) {
ShardRouting shardRouting = shards.get(counter++);
int counter = this.counter;
int index = this.index;
while (counter++ < size()) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
this.counter = counter;
this.index = index;
return shardRouting;
}
}
this.counter = counter;
this.index = index;
return null;
}
@Override public void remove() {
throw new UnsupportedOperationException();
ShardRouting shardModulo(int counter) {
return shards.get((counter % size()));
}
}