optimize shard iteration logic

This commit is contained in:
Shay Banon 2011-07-25 12:48:24 +03:00
parent 31ea01bbc6
commit 549e9c7019
26 changed files with 524 additions and 434 deletions

View File

@ -53,7 +53,7 @@ public class TransportSinglePingAction extends TransportShardSingleOperationActi
@Override protected ShardIterator shards(ClusterState clusterState, SinglePingRequest request) throws ElasticSearchException {
return clusterService.operationRouting()
.indexShards(clusterService.state(), request.index(), request.type, request.id, null);
.getShards(clusterService.state(), request.index(), request.type, request.id, null, null);
}
@Override protected SinglePingResponse shardOperation(SinglePingRequest request, int shardId) throws ElasticSearchException {

View File

@ -80,7 +80,7 @@ public class TransportAnalyzeAction extends TransportSingleCustomOperationAction
@Override protected ShardsIterator shards(ClusterState clusterState, AnalyzeRequest request) {
request.index(clusterState.metaData().concreteIndex(request.index()));
return clusterState.routingTable().index(request.index()).randomAllShardsIt();
return clusterState.routingTable().index(request.index()).randomAllActiveShardsIt();
}
@Override protected AnalyzeResponse shardOperation(AnalyzeRequest request, int shardId) throws ElasticSearchException {

View File

@ -143,6 +143,6 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
* The refresh request works against *all* shards.
*/
@Override protected GroupShardsIterator shards(ClearIndicesCacheRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}
}

View File

@ -119,6 +119,6 @@ public class TransportFlushAction extends TransportBroadcastOperationAction<Flus
* The refresh request works against *all* shards.
*/
@Override protected GroupShardsIterator shards(FlushRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}
}

View File

@ -117,6 +117,6 @@ public class TransportGatewaySnapshotAction extends TransportBroadcastOperationA
* The snapshot request works against all primary shards.
*/
@Override protected GroupShardsIterator shards(GatewaySnapshotRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().primaryShardsGrouped(concreteIndices);
return clusterState.routingTable().activePrimaryShardsGrouped(concreteIndices, true);
}
}

View File

@ -130,6 +130,6 @@ public class TransportOptimizeAction extends TransportBroadcastOperationAction<O
* The refresh request works against *all* shards.
*/
@Override protected GroupShardsIterator shards(OptimizeRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}
}

View File

@ -120,6 +120,6 @@ public class TransportRefreshAction extends TransportBroadcastOperationAction<Re
* The refresh request works against *all* shards.
*/
@Override protected GroupShardsIterator shards(RefreshRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAct
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -81,31 +80,10 @@ public class TransportIndicesSegmentsAction extends TransportBroadcastOperationA
}
/**
* Segments goes across *all* shards.
* Segments goes across *all* active shards.
*/
@Override protected GroupShardsIterator shards(IndicesSegmentsRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected ShardRouting nextShardOrNull(ShardIterator shardIt) {
return shardIt.nextAssignedOrNull();
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected ShardRouting firstShardOrNull(ShardIterator shardIt) {
return shardIt.firstAssignedOrNull();
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected boolean hasNextShard(ShardIterator shardIt) {
return shardIt.hasNextAssigned();
return clusterState.routingTable().allActiveShardsGrouped(concreteIndices, true);
}
@Override protected IndicesSegmentResponse newResponse(IndicesSegmentsRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

View File

@ -29,7 +29,6 @@ import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAct
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -93,28 +92,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
* Status goes across *all* shards.
*/
@Override protected GroupShardsIterator shards(IndicesStatusRequest request, String[] concreteIndices, ClusterState clusterState) {
return clusterState.routingTable().allShardsGrouped(concreteIndices);
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected ShardRouting nextShardOrNull(ShardIterator shardIt) {
return shardIt.nextAssignedOrNull();
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected ShardRouting firstShardOrNull(ShardIterator shardIt) {
return shardIt.firstAssignedOrNull();
}
/**
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
*/
@Override protected boolean hasNextShard(ShardIterator shardIt) {
return shardIt.hasNextAssigned();
return clusterState.routingTable().allAssignedShardsGrouped(concreteIndices, true);
}
@Override protected IndicesStatusResponse newResponse(IndicesStatusRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

View File

@ -69,7 +69,7 @@ public class TransportPercolateAction extends TransportSingleCustomOperationActi
@Override protected ShardsIterator shards(ClusterState clusterState, PercolateRequest request) {
request.index(clusterState.metaData().concreteIndex(request.index()));
return clusterState.routingTable().index(request.index()).randomAllShardsIt();
return clusterState.routingTable().index(request.index()).randomAllActiveShardsIt();
}
@Override protected PercolateResponse shardOperation(PercolateRequest request, int shardId) throws ElasticSearchException {

View File

@ -121,7 +121,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
shardsIts = clusterService.operationRouting().searchShards(clusterState, request.indices(), concreteIndices, request.queryHint(), routingMap, request.preference());
expectedSuccessfulOps = shardsIts.size();
// we need to add 1 for non active partition, since we count it in the total!
expectedTotalOps = shardsIts.totalSizeActiveWith1ForEmpty();
expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
if (expectedSuccessfulOps == 0) {
// not search shards to search on...
@ -133,7 +133,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
// count the local operations, and perform the non local ones
int localOperations = 0;
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.firstActiveOrNull();
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
@ -153,7 +153,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
threadPool.executor(ThreadPool.Names.SEARCH).execute(new Runnable() {
@Override public void run() {
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.firstActiveOrNull();
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performFirstPhase(shardIt);
@ -168,7 +168,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
request.beforeLocalFork();
}
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = shardIt.firstActiveOrNull();
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
if (localAsync) {
@ -188,7 +188,10 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
private void performFirstPhase(final ShardIterator shardIt) {
final ShardRouting shard = shardIt.nextActiveOrNull();
performFirstPhase(shardIt, shardIt.nextOrNull());
}
private void performFirstPhase(final ShardIterator shardIt, final ShardRouting shard) {
if (shard == null) {
// no more active shards... (we should not really get here, but just for safety)
onFirstPhaseResult(null, shardIt, null);
@ -216,12 +219,9 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
processFirstPhaseResult(shard, result);
// increment all the "future" shards to update the total ops since we some may work and some may not...
// and when that happens, we break on total ops, so we must maintain them
while (shardIt.hasNextActive()) {
totalOps.incrementAndGet();
shardIt.nextActive();
}
if (successulOps.incrementAndGet() == expectedSuccessfulOps ||
totalOps.incrementAndGet() == expectedTotalOps) {
int xTotalOps = totalOps.addAndGet(shardIt.remaining() + 1);
successulOps.incrementAndGet();
if (xTotalOps == expectedTotalOps) {
try {
moveToSecondPhase();
} catch (Exception e) {
@ -263,7 +263,8 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
}
} else {
if (shardIt.hasNextActive()) {
ShardRouting nextShard = shardIt.nextOrNull();
if (nextShard != null) {
// trace log this exception
if (logger.isTraceEnabled()) {
if (t != null) {
@ -274,7 +275,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
}
}
}
performFirstPhase(shardIt);
performFirstPhase(shardIt, nextShard);
} else {
// no more shards active, add a failure
// e is null when there is no next active....

View File

@ -95,31 +95,6 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
protected abstract GroupShardsIterator shards(Request request, String[] concreteIndices, ClusterState clusterState);
/**
* Allows to override how shard routing is iterated over. Default implementation uses
* {@link org.elasticsearch.cluster.routing.ShardIterator#nextActiveOrNull()}.
*
* <p>Note, if overriding this method, make sure to also override {@link #hasNextShard(org.elasticsearch.cluster.routing.ShardIterator)},
* and {@link #firstShardOrNull(org.elasticsearch.cluster.routing.ShardIterator)}.
*/
protected ShardRouting nextShardOrNull(ShardIterator shardIt) {
return shardIt.nextActiveOrNull();
}
protected ShardRouting firstShardOrNull(ShardIterator shardIt) {
return shardIt.firstActiveOrNull();
}
/**
* Allows to override how shard routing is iterated over. Default implementation uses
* {@link org.elasticsearch.cluster.routing.ShardIterator#hasNextActive()}.
*
* <p>Note, if overriding this method, make sure to also override {@link #nextShardOrNull(org.elasticsearch.cluster.routing.ShardIterator)}.
*/
protected boolean hasNextShard(ShardIterator shardIt) {
return shardIt.hasNextActive();
}
protected boolean accumulateExceptions() {
return true;
}
@ -180,7 +155,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
// count the local operations, and perform the non local ones
int localOperations = 0;
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = firstShardOrNull(shardIt);
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
localOperations++;
@ -200,7 +175,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
threadPool.executor(executor).execute(new Runnable() {
@Override public void run() {
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = firstShardOrNull(shardIt);
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt, false);
@ -215,7 +190,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
request.beforeLocalFork();
}
for (final ShardIterator shardIt : shardsIts) {
final ShardRouting shard = firstShardOrNull(shardIt);
final ShardRouting shard = shardIt.firstOrNull();
if (shard != null) {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
performOperation(shardIt, localAsync);
@ -227,7 +202,10 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
}
private void performOperation(final ShardIterator shardIt, boolean localAsync) {
final ShardRouting shard = nextShardOrNull(shardIt);
performOperation(shardIt, shardIt.nextOrNull(), localAsync);
}
private void performOperation(final ShardIterator shardIt, final ShardRouting shard, boolean localAsync) {
if (shard == null) {
// no more active shards... (we should not really get here, just safety)
onOperation(null, shardIt, null);
@ -289,7 +267,24 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
@SuppressWarnings({"unchecked"})
private void onOperation(@Nullable ShardRouting shard, final ShardIterator shardIt, Throwable t) {
if (!hasNextShard(shardIt)) {
ShardRouting nextShard = shardIt.nextOrNull();
if (nextShard != null) {
// trace log this exception
if (logger.isTraceEnabled()) {
if (t != null) {
if (shard != null) {
logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
logger.trace(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
}
}
}
// we are not threaded here if we got here from the transport
// or we possibly threaded if we got from a local threaded one,
// in which case, the next shard in the partition will not be local one
// so there is no meaning to this flag
performOperation(shardIt, nextShard, true);
} else {
// e is null when there is no next active....
if (logger.isDebugEnabled()) {
if (t != null) {
@ -315,24 +310,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
if (expectedOps == counterOps.incrementAndGet()) {
finishHim();
}
return;
} else {
// trace log this exception
if (logger.isTraceEnabled()) {
if (t != null) {
if (shard != null) {
logger.trace(shard.shortSummary() + ": Failed to execute [" + request + "]", t);
} else {
logger.trace(shardIt.shardId() + ": Failed to execute [" + request + "]", t);
}
}
}
}
// we are not threaded here if we got here from the transport
// or we possibly threaded if we got from a local threaded one,
// in which case, the next shard in the partition will not be local one
// so there is no meaning to this flag
performOperation(shardIt, true);
}
private void finishHim() {

View File

@ -51,7 +51,14 @@ import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -289,7 +296,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
boolean foundPrimary = false;
for (final ShardRouting shard : shardIt) {
ShardRouting shardX;
while ((shardX = shardIt.nextOrNull()) != null) {
final ShardRouting shard = shardX;
// we only deal with primary shardIt here...
if (!shard.primary()) {
continue;
@ -312,6 +321,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
requiredNumber = shardIt.size();
}
if (shardIt.sizeActive() < requiredNumber) {
retry(fromClusterEvent, shard.shardId());
return false;
@ -368,8 +378,8 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
// we should never get here, but here we go
if (!foundPrimary) {
final UnavailableShardsException failure = new UnavailableShardsException(shardIt.shardId(), request.toString());
listener.onFailure(failure);
retry(fromClusterEvent, shardIt.shardId());
return false;
}
return true;
}
@ -440,30 +450,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
}
// initialize the counter
int replicaCounter = 0;
for (final ShardRouting shard : shardIt.reset()) {
// if its unassigned, nothing to do here...
if (shard.unassigned()) {
continue;
}
// if the shard is primary and relocating, add one to the counter since we perform it on the replica as well
// (and we already did it on the primary)
if (shard.primary()) {
if (shard.relocating()) {
replicaCounter++;
}
} else {
replicaCounter++;
// if we are relocating the replica, we want to perform the index operation on both the relocating
// shard and the target shard. This means that we won't loose index operations between end of recovery
// and reassignment of the shard by the master node
if (shard.relocating()) {
replicaCounter++;
}
}
}
int replicaCounter = shardIt.assignedReplicasIncludingRelocating();
if (replicaCounter == 0) {
postPrimaryOperation(request, response);
@ -483,7 +470,9 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
replicaCounter++;
AtomicInteger counter = new AtomicInteger(replicaCounter);
for (final ShardRouting shard : shardIt.reset()) {
shardIt.reset(); // reset the iterator
ShardRouting shard;
while ((shard = shardIt.nextOrNull()) != null) {
// if its unassigned, nothing to do here...
if (shard.unassigned()) {
continue;

View File

@ -35,7 +35,11 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.BaseTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
@ -124,9 +128,12 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
*/
private void performFirst() {
if (request.preferLocalShard()) {
while (shardsIt.hasNextActive()) {
final ShardRouting shard = shardsIt.nextActive();
boolean foundLocal = false;
ShardRouting shardX;
while ((shardX = shardsIt.nextOrNull()) != null) {
final ShardRouting shard = shardX;
if (shard.currentNodeId().equals(nodes.localNodeId())) {
foundLocal = true;
if (request.operationThreaded()) {
request.beforeLocalFork();
threadPool.executor(executor()).execute(new Runnable() {
@ -151,20 +158,29 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
}
}
}
if (!foundLocal) {
// no local node get, go remote
shardsIt.reset();
perform(null);
}
} else {
perform(null);
}
if (!shardsIt.hasNextActive()) {
// no local node get, go remote
shardsIt.reset();
perform(null);
}
}
private void perform(final Exception lastException) {
while (shardsIt.hasNextActive()) {
final ShardRouting shard = shardsIt.nextActive();
// no need to check for local nodes, we tried them already in performFirstGet
final ShardRouting shard = shardsIt.nextOrNull();
if (shard == null) {
Exception failure = lastException;
if (failure == null) {
failure = new NoShardAvailableActionException(null, "No shard available for [" + request + "]");
} else {
if (logger.isDebugEnabled()) {
logger.debug("failed to execute [" + request + "]", failure);
}
}
listener.onFailure(failure);
} else {
if (shard.currentNodeId().equals(nodes.localNodeId())) {
// we don't prefer local shard, so try and do it here
if (!request.preferLocalShard()) {
@ -180,12 +196,10 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
}
}
});
return;
} else {
try {
final Response response = shardOperation(request, shard.id());
listener.onResponse(response);
return;
} catch (Exception e) {
onFailure(shard, e);
}
@ -210,20 +224,8 @@ public abstract class TransportSingleCustomOperationAction<Request extends Singl
onFailure(shard, exp);
}
});
return;
}
}
if (!shardsIt.hasNextActive()) {
Exception failure = lastException;
if (failure == null) {
failure = new NoShardAvailableActionException(null, "No shard available for [" + request + "]");
} else {
if (logger.isDebugEnabled()) {
logger.debug("failed to execute [" + request + "]", failure);
}
}
listener.onFailure(failure);
}
}
}

View File

@ -92,7 +92,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
protected abstract ShardIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException;
private class AsyncSingleAction {
class AsyncSingleAction {
private final ActionListener<Response> listener;
@ -129,7 +129,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
}
private void perform(@Nullable final Exception lastException) {
final ShardRouting shardRouting = shardIt.nextActiveOrNull();
final ShardRouting shardRouting = shardIt.nextOrNull();
if (shardRouting == null) {
Exception failure = lastException;
if (failure == null) {

View File

@ -41,10 +41,10 @@ public class GroupShardsIterator implements Iterable<ShardIterator> {
return size;
}
public int totalSizeActiveWith1ForEmpty() {
public int totalSizeWith1ForEmpty() {
int size = 0;
for (ShardIterator shard : iterators) {
int sizeActive = shard.sizeActive();
int sizeActive = shard.size();
if (sizeActive == 0) {
size += 1;
} else {
@ -54,14 +54,6 @@ public class GroupShardsIterator implements Iterable<ShardIterator> {
return size;
}
public int totalSizeActive() {
int size = 0;
for (ShardIterator shard : iterators) {
size += shard.sizeActive();
}
return size;
}
public int size() {
return iterators.size();
}

View File

@ -52,6 +52,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
private final ImmutableMap<Integer, IndexShardRoutingTable> shards;
private final ImmutableList<ShardRouting> allShards;
private final ImmutableList<ShardRouting> allActiveShards;
private final AtomicInteger counter = new AtomicInteger();
@ -59,12 +60,17 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
this.index = index;
this.shards = ImmutableMap.copyOf(shards);
ImmutableList.Builder<ShardRouting> allShards = ImmutableList.builder();
ImmutableList.Builder<ShardRouting> allActiveShards = ImmutableList.builder();
for (IndexShardRoutingTable indexShardRoutingTable : shards.values()) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
allShards.add(shardRouting);
if (shardRouting.active()) {
allActiveShards.add(shardRouting);
}
}
}
this.allShards = allShards.build();
this.allActiveShards = allActiveShards.build();
}
public String index() {
@ -198,6 +204,10 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
return new PlainShardsIterator(allShards, counter.incrementAndGet());
}
public ShardsIterator randomAllActiveShardsIt() {
return new PlainShardsIterator(allActiveShards, counter.incrementAndGet());
}
/**
* A group shards iterator where each group ({@link ShardIterator}
* is an iterator across shard replication group.

View File

@ -20,7 +20,6 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.collect.UnmodifiableIterator;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -41,7 +40,12 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
final ShardId shardId;
final ShardRouting primary;
final ImmutableList<ShardRouting> primaryAsList;
final ImmutableList<ShardRouting> replicas;
final ImmutableList<ShardRouting> shards;
final ImmutableList<ShardRouting> activeShards;
final ImmutableList<ShardRouting> assignedShards;
final AtomicInteger counter;
@ -52,6 +56,35 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
this.shards = shards;
this.allocatedPostApi = allocatedPostApi;
this.counter = new AtomicInteger(ThreadLocalRandom.current().nextInt(shards.size()));
ShardRouting primary = null;
List<ShardRouting> replicas = new ArrayList<ShardRouting>();
List<ShardRouting> activeShards = new ArrayList<ShardRouting>();
List<ShardRouting> assignedShards = new ArrayList<ShardRouting>();
for (ShardRouting shard : shards) {
if (shard.primary()) {
primary = shard;
} else {
replicas.add(shard);
}
if (shard.active()) {
activeShards.add(shard);
}
if (shard.assignedToNode()) {
assignedShards.add(shard);
}
}
this.primary = primary;
if (primary != null) {
this.primaryAsList = ImmutableList.of(primary);
} else {
this.primaryAsList = ImmutableList.of();
}
this.replicas = ImmutableList.copyOf(replicas);
this.activeShards = ImmutableList.copyOf(activeShards);
this.assignedShards = ImmutableList.copyOf(assignedShards);
}
/**
@ -117,13 +150,29 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
}
public ImmutableList<ShardRouting> shards() {
return shards;
return this.shards;
}
public ImmutableList<ShardRouting> getShards() {
return shards();
}
public ImmutableList<ShardRouting> activeShards() {
return this.activeShards;
}
public ImmutableList<ShardRouting> getActiveShards() {
return activeShards();
}
public ImmutableList<ShardRouting> assignedShards() {
return this.assignedShards;
}
public ImmutableList<ShardRouting> getAssignedShards() {
return this.assignedShards;
}
public int countWithState(ShardRoutingState state) {
int count = 0;
for (ShardRouting shard : this) {
@ -134,9 +183,10 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return count;
}
/**
* Returns a regular shard iterator.
*/
public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shards, counter.getAndIncrement());
}
public ShardIterator shardsIt() {
return new PlainShardIterator(shardId, shards);
}
@ -145,27 +195,65 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, shards, index);
}
public ShardIterator activeShardsRandomIt() {
return new PlainShardIterator(shardId, activeShards, counter.getAndIncrement());
}
public ShardIterator activeShardsIt() {
return new PlainShardIterator(shardId, activeShards);
}
public ShardIterator activeShardsIt(int index) {
return new PlainShardIterator(shardId, activeShards, index);
}
public ShardIterator assignedShardsRandomIt() {
return new PlainShardIterator(shardId, assignedShards, counter.getAndIncrement());
}
public ShardIterator assignedShardsIt() {
return new PlainShardIterator(shardId, assignedShards);
}
public ShardIterator assignedShardsIt(int index) {
return new PlainShardIterator(shardId, assignedShards, index);
}
/**
* Returns an iterator only on the primary shard.
*/
public ShardIterator primaryShardIt() {
ShardRouting primary = primaryShard();
if (primary == null) {
return new PlainShardIterator(shardId, ImmutableList.<ShardRouting>of());
}
return new PlainShardIterator(shardId, ImmutableList.of(primary));
return new PlainShardIterator(shardId, primaryAsList);
}
/**
* Prefers execution on the local node if applicable.
* Prefers execution on the provided node if applicable.
*/
public ShardIterator preferLocalShardsIt(String nodeId) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(this.shards.size());
public ShardIterator preferNodeShardsIt(String nodeId) {
return preferNodeShardsIt(nodeId, shards);
}
/**
* Prefers execution on the provided node if applicable.
*/
public ShardIterator preferNodeActiveShardsIt(String nodeId) {
return preferNodeShardsIt(nodeId, activeShards);
}
/**
* Prefers execution on the provided node if applicable.
*/
public ShardIterator preferNodeAssignedShardsIt(String nodeId) {
return preferNodeShardsIt(nodeId, assignedShards);
}
private ShardIterator preferNodeShardsIt(String nodeId, ImmutableList<ShardRouting> shards) {
ArrayList<ShardRouting> ordered = new ArrayList<ShardRouting>(shards.size());
// fill it in a randomized fashion
int index = Math.abs(counter.getAndIncrement());
for (int i = 0; i < this.shards.size(); i++) {
int loc = (index + i) % this.shards.size();
ShardRouting shardRouting = this.shards.get(loc);
for (int i = 0; i < shards.size(); i++) {
int loc = (index + i) % shards.size();
ShardRouting shardRouting = shards.get(loc);
ordered.add(shardRouting);
if (nodeId.equals(shardRouting.currentNodeId())) {
// switch, its the matching node id
@ -176,30 +264,12 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
return new PlainShardIterator(shardId, ordered);
}
/**
* Returns a random shards iterator.
*/
public ShardIterator shardsRandomIt() {
return new PlainShardIterator(shardId, shards, counter.getAndIncrement());
}
public ShardRouting primaryShard() {
for (ShardRouting shardRouting : this) {
if (shardRouting.primary()) {
return shardRouting;
}
}
return null;
return primary;
}
public List<ShardRouting> replicaShards() {
List<ShardRouting> replicaShards = Lists.newArrayListWithCapacity(2);
for (ShardRouting shardRouting : this) {
if (!shardRouting.primary()) {
replicaShards.add(shardRouting);
}
}
return replicaShards;
return this.replicas;
}
public List<ShardRouting> shardsWithState(ShardRoutingState... states) {

View File

@ -19,9 +19,7 @@
package org.elasticsearch.cluster.routing;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
/**
* @author kimchy (shay.banon)
@ -32,11 +30,11 @@ public class PlainShardsIterator implements ShardsIterator {
private final int size;
private final int origIndex;
private final int index;
private volatile int index;
private final int limit;
private volatile int counter = 0;
private volatile int counter;
public PlainShardsIterator(List<ShardRouting> shards) {
this(shards, 0);
@ -45,34 +43,46 @@ public class PlainShardsIterator implements ShardsIterator {
public PlainShardsIterator(List<ShardRouting> shards, int index) {
this.shards = shards;
this.size = shards.size();
this.index = Math.abs(index);
this.origIndex = this.index;
}
@Override public Iterator<ShardRouting> iterator() {
return this;
if (size == 0) {
this.index = 0;
} else {
this.index = Math.abs(index % size);
}
this.counter = this.index;
this.limit = this.index + size;
}
@Override public ShardsIterator reset() {
counter = 0;
index = origIndex;
this.counter = this.index;
return this;
}
@Override public boolean hasNext() {
return counter < size;
@Override public int remaining() {
return limit - counter;
}
@Override public ShardRouting next() throws NoSuchElementException {
if (!hasNext()) {
throw new NoSuchElementException("No shard found");
@Override public ShardRouting firstOrNull() {
if (size == 0) {
return null;
}
counter++;
return shardModulo(index++);
return shards.get((index + 1) % size);
}
@Override public void remove() {
throw new UnsupportedOperationException();
@Override public ShardRouting nextOrNull() {
if (size == 0) {
return null;
}
int counter = (this.counter);
if (counter >= size) {
if (counter >= limit) {
return null;
}
this.counter = counter + 1;
return shards.get(counter - size);
} else {
this.counter = counter + 1;
return shards.get(counter);
}
}
@Override public int size() {
@ -80,122 +90,42 @@ public class PlainShardsIterator implements ShardsIterator {
}
@Override public int sizeActive() {
int shardsActive = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.active()) {
shardsActive++;
int count = 0;
for (int i = 0; i < size; i++) {
if (shards.get(i).active()) {
count++;
}
}
return shardsActive;
return count;
}
@Override public boolean hasNextActive() {
int counter = this.counter;
int index = this.index;
while (counter++ < size) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
return true;
@Override public int assignedReplicasIncludingRelocating() {
int count = 0;
for (int i = 0; i < size; i++) {
ShardRouting shard = shards.get(i);
if (shard.unassigned()) {
continue;
}
// if the shard is primary and relocating, add one to the counter since we perform it on the replica as well
// (and we already did it on the primary)
if (shard.primary()) {
if (shard.relocating()) {
count++;
}
} else {
count++;
// if we are relocating the replica, we want to perform the index operation on both the relocating
// shard and the target shard. This means that we won't loose index operations between end of recovery
// and reassignment of the shard by the master node
if (shard.relocating()) {
count++;
}
}
}
return false;
return count;
}
@Override public ShardRouting nextActive() throws NoSuchElementException {
ShardRouting shardRouting = nextActiveOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No active shard found");
}
return shardRouting;
}
@Override public ShardRouting nextActiveOrNull() {
int counter = this.counter;
int index = this.index;
while (counter++ < size) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
this.counter = counter;
this.index = index;
return shardRouting;
}
}
this.counter = counter;
this.index = index;
return null;
}
@Override public ShardRouting firstActiveOrNull() {
int counter = 0;
int index = this.origIndex;
while (counter++ < size) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.active()) {
return shardRouting;
}
}
return null;
}
@Override public int sizeAssigned() {
int shardsAssigned = 0;
for (ShardRouting shardRouting : shards) {
if (shardRouting.assignedToNode()) {
shardsAssigned++;
}
}
return shardsAssigned;
}
@Override public boolean hasNextAssigned() {
int counter = this.counter;
int index = this.index;
while (counter++ < size) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
return true;
}
}
return false;
}
@Override public ShardRouting nextAssigned() throws NoSuchElementException {
ShardRouting shardRouting = nextAssignedOrNull();
if (shardRouting == null) {
throw new NoSuchElementException("No assigned shard found");
}
return shardRouting;
}
@Override public ShardRouting nextAssignedOrNull() {
int counter = this.counter;
int index = this.index;
while (counter++ < size) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
this.counter = counter;
this.index = index;
return shardRouting;
}
}
this.counter = counter;
this.index = index;
return null;
}
@Override public ShardRouting firstAssignedOrNull() {
int counter = 0;
int index = this.origIndex;
while (counter++ < size) {
ShardRouting shardRouting = shardModulo(index++);
if (shardRouting.assignedToNode()) {
return shardRouting;
}
}
return null;
}
final ShardRouting shardModulo(int counter) {
return shards.get((counter % size));
@Override public Iterable<ShardRouting> asUnordered() {
return shards;
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.Iterables;
import org.elasticsearch.common.collect.Lists;
@ -172,8 +173,60 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
return new GroupShardsIterator(set);
}
public GroupShardsIterator allActiveShardsGrouped(String[] indices, boolean includeEmpty) throws IndexMissingException {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<ShardIterator>();
if (indices == null || indices.length == 0) {
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
}
for (String index : indices) {
IndexRoutingTable indexRoutingTable = index(index);
if (indexRoutingTable == null) {
continue;
// we simply ignore indices that don't exists (make sense for operations that use it currently)
// throw new IndexMissingException(new Index(index));
}
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.active()) {
set.add(shardRouting.shardsIt());
} else if (includeEmpty) { // we need this for counting properly, just make it an empty one
set.add(new PlainShardIterator(shardRouting.shardId(), ImmutableList.<ShardRouting>of()));
}
}
}
}
return new GroupShardsIterator(set);
}
public GroupShardsIterator allAssignedShardsGrouped(String[] indices, boolean includeEmpty) throws IndexMissingException {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<ShardIterator>();
if (indices == null || indices.length == 0) {
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
}
for (String index : indices) {
IndexRoutingTable indexRoutingTable = index(index);
if (indexRoutingTable == null) {
continue;
// we simply ignore indices that don't exists (make sense for operations that use it currently)
// throw new IndexMissingException(new Index(index));
}
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
for (ShardRouting shardRouting : indexShardRoutingTable) {
if (shardRouting.assignedToNode()) {
set.add(shardRouting.shardsIt());
} else if (includeEmpty) { // we need this for counting properly, just make it an empty one
set.add(new PlainShardIterator(shardRouting.shardId(), ImmutableList.<ShardRouting>of()));
}
}
}
}
return new GroupShardsIterator(set);
}
/**
* All the primary shards for the provided indices grouped (each group is a single element, consisting
* All the *active* primary shards for the provided indices grouped (each group is a single element, consisting
* of the primary shard). This is handy for components that expect to get group iterators, but still want in some
* cases to iterate over all primary shards (and not just one shard in replication group).
*
@ -182,7 +235,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
* @throws IndexMissingException If an index passed does not exists
* @see IndexRoutingTable#groupByAllIt()
*/
public GroupShardsIterator primaryShardsGrouped(String... indices) throws IndexMissingException {
public GroupShardsIterator activePrimaryShardsGrouped(String[] indices, boolean includeEmpty) throws IndexMissingException {
// use list here since we need to maintain identity across shards
ArrayList<ShardIterator> set = new ArrayList<ShardIterator>();
if (indices == null || indices.length == 0) {
@ -194,7 +247,12 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
throw new IndexMissingException(new Index(index));
}
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
set.add(indexShardRoutingTable.primaryShard().shardsIt());
ShardRouting primary = indexShardRoutingTable.primaryShard();
if (primary.active()) {
set.add(primary.shardsIt());
} else if (includeEmpty) { // we need this for counting properly, just make it an empty one
set.add(new PlainShardIterator(primary.shardId(), ImmutableList.<ShardRouting>of()));
}
}
}
return new GroupShardsIterator(set);

View File

@ -19,15 +19,12 @@
package org.elasticsearch.cluster.routing;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
* Allows to iterate over unrelated shards.
*
* @author kimchy (shay.banon)
*/
public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRouting> {
public interface ShardsIterator {
/**
* Resets the iterator.
@ -39,80 +36,29 @@ public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRo
*/
int size();
/**
* The number of active shard routing instances.
*
* @see ShardRouting#active()
*/
int sizeActive();
/**
* Is there an active shard we can iterate to.
*
* @see ShardRouting#active()
*/
boolean hasNextActive();
int assignedReplicasIncludingRelocating();
/**
* Returns the next active shard, or throws {@link NoSuchElementException}.
*
* @see ShardRouting#active()
* Returns the next shard, or <tt>null</tt> if none available.
*/
ShardRouting nextActive() throws NoSuchElementException;
ShardRouting nextOrNull();
/**
* Returns the next active shard, or <tt>null</tt>.
*
* @see ShardRouting#active()
*/
ShardRouting nextActiveOrNull();
/**
* Returns the first active shard, or <tt>null</tt>, without
* incrementing the iterator.
*
* @see ShardRouting#active()
*/
ShardRouting firstActiveOrNull();
/**
* The number of assigned shard routing instances.
*
* @see ShardRouting#assignedToNode()
*/
int sizeAssigned();
/**
* Is there an assigned shard we can iterate to.
*
* @see ShardRouting#assignedToNode()
*/
boolean hasNextAssigned();
/**
* Returns the next assigned shard, or throws {@link NoSuchElementException}.
*
* @see ShardRouting#assignedToNode()
*/
ShardRouting nextAssigned() throws NoSuchElementException;
/**
* Returns the next assigned shard, or <tt>null</tt>.
*
* @see ShardRouting#assignedToNode()
*/
ShardRouting nextAssignedOrNull();
/**
* Returns the first assigned shard, or <tt>null</tt>, wuthout
* Returns the first shard, or <tt>null</tt>, without
* incrementing the iterator.
*
* @see ShardRouting#assignedToNode()
*/
ShardRouting firstAssignedOrNull();
ShardRouting firstOrNull();
int remaining();
int hashCode();
boolean equals(Object other);
Iterable<ShardRouting> asUnordered();
}

View File

@ -67,11 +67,11 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
}
@Override public ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing, @Nullable String preference) throws IndexMissingException, IndexShardMissingException {
return preferenceShardIterator(shards(clusterState, index, type, id, routing), clusterState.nodes().localNodeId(), preference);
return preferenceActiveShardIterator(shards(clusterState, index, type, id, routing), clusterState.nodes().localNodeId(), preference);
}
@Override public ShardIterator getShards(ClusterState clusterState, String index, int shardId, @Nullable String preference) throws IndexMissingException, IndexShardMissingException {
return preferenceShardIterator(shards(clusterState, index, shardId), clusterState.nodes().localNodeId(), preference);
return preferenceActiveShardIterator(shards(clusterState, index, shardId), clusterState.nodes().localNodeId(), preference);
}
@Override public GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) throws IndexMissingException {
@ -149,7 +149,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
throw new IndexShardMissingException(new ShardId(index, shardId));
}
// we might get duplicates, but that's ok, they will override one another
set.add(preferenceShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
set.add(preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
}
}
}
@ -160,19 +160,19 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
for (String index : concreteIndices) {
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
for (IndexShardRoutingTable indexShard : indexRouting) {
set.add(preferenceShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
set.add(preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), preference));
}
}
return new GroupShardsIterator(set);
}
}
private ShardIterator preferenceShardIterator(IndexShardRoutingTable indexShard, String nodeId, @Nullable String preference) {
private ShardIterator preferenceActiveShardIterator(IndexShardRoutingTable indexShard, String nodeId, @Nullable String preference) {
if (preference == null) {
return indexShard.shardsRandomIt();
return indexShard.activeShardsRandomIt();
}
if ("_local".equals(preference)) {
return indexShard.preferLocalShardsIt(nodeId);
return indexShard.preferNodeShardsIt(nodeId);
}
if ("_primary".equals(preference)) {
return indexShard.primaryShardIt();

View File

@ -254,6 +254,9 @@ public class MulticastZenPing extends AbstractLifecycleComponent<ZenPing> implem
if (remove) {
receivedResponses.remove(id);
}
if (lifecycle.stoppedOrClosed()) {
return;
}
throw new ZenPingException("Failed to send ping request over multicast on " + multicastSocket, e);
}
}

View File

@ -20,9 +20,12 @@
package org.elasticsearch.cluster.structure;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.index.shard.ShardId;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
@ -34,6 +37,152 @@ import static org.hamcrest.Matchers.*;
@Test
public class RoutingIteratorTests {
@Test public void testEmptyIterator() {
ShardIterator shardIterator = new PlainShardIterator(new ShardId("test1", 0), ImmutableList.<ShardRouting>of(), 0);
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.firstOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
shardIterator = new PlainShardIterator(new ShardId("test1", 0), ImmutableList.<ShardRouting>of(), 1);
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.firstOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
shardIterator = new PlainShardIterator(new ShardId("test1", 0), ImmutableList.<ShardRouting>of(), 2);
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.firstOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
shardIterator = new PlainShardIterator(new ShardId("test1", 0), ImmutableList.<ShardRouting>of(), 3);
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.firstOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
}
@Test public void testIterator1() {
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test1").initializeEmpty(metaData.index("test1")))
.build();
ShardIterator shardIterator = routingTable.index("test1").shard(0).shardsIt(0);
assertThat(shardIterator.size(), equalTo(3));
assertThat(shardIterator.firstOrNull(), notNullValue());
assertThat(shardIterator.remaining(), equalTo(3));
assertThat(shardIterator.firstOrNull(), sameInstance(shardIterator.firstOrNull()));
assertThat(shardIterator.remaining(), equalTo(3));
ShardRouting shardRouting1 = shardIterator.nextOrNull();
assertThat(shardRouting1, notNullValue());
assertThat(shardIterator.remaining(), equalTo(2));
ShardRouting shardRouting2 = shardIterator.nextOrNull();
assertThat(shardRouting2, notNullValue());
assertThat(shardIterator.remaining(), equalTo(1));
assertThat(shardRouting2, not(sameInstance(shardRouting1)));
ShardRouting shardRouting3 = shardIterator.nextOrNull();
assertThat(shardRouting3, notNullValue());
assertThat(shardRouting3, not(sameInstance(shardRouting1)));
assertThat(shardRouting3, not(sameInstance(shardRouting2)));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
}
@Test public void testIterator2() {
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
.put(newIndexMetaDataBuilder("test2").numberOfShards(1).numberOfReplicas(1))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test1").initializeEmpty(metaData.index("test1")))
.add(indexRoutingTable("test2").initializeEmpty(metaData.index("test2")))
.build();
ShardIterator shardIterator = routingTable.index("test1").shard(0).shardsIt(0);
assertThat(shardIterator.size(), equalTo(2));
assertThat(shardIterator.firstOrNull(), notNullValue());
assertThat(shardIterator.remaining(), equalTo(2));
assertThat(shardIterator.firstOrNull(), sameInstance(shardIterator.firstOrNull()));
assertThat(shardIterator.remaining(), equalTo(2));
ShardRouting shardRouting1 = shardIterator.nextOrNull();
assertThat(shardRouting1, notNullValue());
assertThat(shardIterator.remaining(), equalTo(1));
ShardRouting shardRouting2 = shardIterator.nextOrNull();
assertThat(shardRouting2, notNullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardRouting2, not(sameInstance(shardRouting1)));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.remaining(), equalTo(0));
shardIterator = routingTable.index("test1").shard(0).shardsIt(1);
assertThat(shardIterator.size(), equalTo(2));
assertThat(shardIterator.firstOrNull(), notNullValue());
assertThat(shardIterator.firstOrNull(), sameInstance(shardIterator.firstOrNull()));
ShardRouting shardRouting3 = shardIterator.nextOrNull();
assertThat(shardRouting1, notNullValue());
ShardRouting shardRouting4 = shardIterator.nextOrNull();
assertThat(shardRouting2, notNullValue());
assertThat(shardRouting2, not(sameInstance(shardRouting1)));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardRouting1, not(sameInstance(shardRouting3)));
assertThat(shardRouting2, not(sameInstance(shardRouting4)));
assertThat(shardRouting1, sameInstance(shardRouting4));
assertThat(shardRouting2, sameInstance(shardRouting3));
shardIterator = routingTable.index("test1").shard(0).shardsIt(2);
assertThat(shardIterator.size(), equalTo(2));
assertThat(shardIterator.firstOrNull(), notNullValue());
assertThat(shardIterator.firstOrNull(), sameInstance(shardIterator.firstOrNull()));
ShardRouting shardRouting5 = shardIterator.nextOrNull();
assertThat(shardRouting5, notNullValue());
ShardRouting shardRouting6 = shardIterator.nextOrNull();
assertThat(shardRouting6, notNullValue());
assertThat(shardRouting6, not(sameInstance(shardRouting5)));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardRouting5, sameInstance(shardRouting1));
assertThat(shardRouting6, sameInstance(shardRouting2));
shardIterator = routingTable.index("test1").shard(0).shardsIt(3);
assertThat(shardIterator.size(), equalTo(2));
assertThat(shardIterator.firstOrNull(), notNullValue());
assertThat(shardIterator.firstOrNull(), sameInstance(shardIterator.firstOrNull()));
ShardRouting shardRouting7 = shardIterator.nextOrNull();
assertThat(shardRouting7, notNullValue());
ShardRouting shardRouting8 = shardIterator.nextOrNull();
assertThat(shardRouting8, notNullValue());
assertThat(shardRouting8, not(sameInstance(shardRouting7)));
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardRouting7, sameInstance(shardRouting3));
assertThat(shardRouting8, sameInstance(shardRouting4));
}
@Test public void testRandomRouting() {
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test1").numberOfShards(1).numberOfReplicas(1))
@ -46,12 +195,18 @@ public class RoutingIteratorTests {
.build();
ShardIterator shardIterator = routingTable.index("test1").shard(0).shardsRandomIt();
assertThat(shardIterator.hasNext(), equalTo(true));
ShardRouting shardRouting1 = shardIterator.next();
ShardRouting shardRouting1 = shardIterator.nextOrNull();
assertThat(shardRouting1, notNullValue());
assertThat(shardIterator.nextOrNull(), notNullValue());
assertThat(shardIterator.nextOrNull(), nullValue());
shardIterator = routingTable.index("test1").shard(0).shardsRandomIt();
assertThat(shardIterator.hasNext(), equalTo(true));
ShardRouting shardRouting2 = shardIterator.next();
ShardRouting shardRouting2 = shardIterator.nextOrNull();
assertThat(shardRouting2, notNullValue());
ShardRouting shardRouting3 = shardIterator.nextOrNull();
assertThat(shardRouting3, notNullValue());
assertThat(shardIterator.nextOrNull(), nullValue());
assertThat(shardRouting1, not(sameInstance(shardRouting2)));
assertThat(shardRouting1, sameInstance(shardRouting3));
}
}

View File

@ -119,7 +119,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
for (ShardRouting shardRouting : shardIt.asUnordered()) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
@ -187,7 +187,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
for (ShardRouting shardRouting : shardIt.asUnordered()) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
@ -281,7 +281,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = newHashMap();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
for (ShardRouting shardRouting : shardIt.asUnordered()) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_AND_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
QueryFetchSearchResult queryFetchResult = nodeToSearchService.get(shardRouting.currentNodeId()).executeFetchPhase(searchRequest);
@ -333,7 +333,7 @@ public class ThreeShardsEmbeddedSearchTests extends AbstractNodesTests {
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
for (ShardRouting shardRouting : shardIt.asUnordered()) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
QuerySearchResult queryResult = nodeToSearchService.get(shardRouting.currentNodeId()).executeQueryPhase(searchRequest);

View File

@ -127,7 +127,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
for (ShardRouting shardRouting : shardIt.asUnordered()) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.DFS_QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
@ -194,7 +194,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
for (ShardRouting shardRouting : shardIt.asUnordered()) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.DFS_QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
@ -284,7 +284,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
// do this with dfs, since we have uneven distribution of docs between shards
List<DfsSearchResult> dfsResults = newArrayList();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
for (ShardRouting shardRouting : shardIt.asUnordered()) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_AND_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
@ -339,7 +339,7 @@ public class ThreeShardsUnbalancedShardsEmbeddedSearchTests extends AbstractNode
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, new String[]{"test"}, null, null, null)) {
for (ShardRouting shardRouting : shardIt) {
for (ShardRouting shardRouting : shardIt.asUnordered()) {
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder, SearchType.QUERY_THEN_FETCH)
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
QuerySearchResult queryResult = nodeToSearchService.get(shardRouting.currentNodeId()).executeQueryPhase(searchRequest);