rename shardsIt to shardIt
This commit is contained in:
parent
a74465000d
commit
7db5e63ab7
|
@ -23,7 +23,7 @@ import org.elasticsearch.action.support.replication.TransportShardReplicationOpe
|
|||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
@ -64,7 +64,7 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
|
|||
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ClusterState clusterState, ShardReplicationPingRequest request) {
|
||||
@Override protected ShardIterator shards(ClusterState clusterState, ShardReplicationPingRequest request) {
|
||||
return clusterService.state().routingTable().index(request.index()).shard(request.shardId()).shardsIt();
|
||||
}
|
||||
}
|
|
@ -29,8 +29,8 @@ import org.elasticsearch.action.support.broadcast.TransportBroadcastOperationAct
|
|||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
@ -93,14 +93,14 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
|
|||
/**
|
||||
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
|
||||
*/
|
||||
@Override protected ShardRouting nextShardOrNull(ShardsIterator shardIt) {
|
||||
@Override protected ShardRouting nextShardOrNull(ShardIterator shardIt) {
|
||||
return shardIt.nextAssignedOrNull();
|
||||
}
|
||||
|
||||
/**
|
||||
* We want to go over all assigned nodes (to get recovery status) and not just active ones.
|
||||
*/
|
||||
@Override protected boolean hasNextShard(ShardsIterator shardIt) {
|
||||
@Override protected boolean hasNextShard(ShardIterator shardIt) {
|
||||
return shardIt.hasNextAssigned();
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.elasticsearch.cluster.ClusterService;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.UUID;
|
||||
import org.elasticsearch.common.collect.Lists;
|
||||
import org.elasticsearch.common.collect.Maps;
|
||||
|
@ -187,11 +187,11 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
|
|||
if (mappingMd != null && mappingMd.routing().required() && deleteRequest.routing() == null) {
|
||||
// if routing is required, and no routing on the delete request, we need to broadcast it....
|
||||
GroupShardsIterator groupShards = clusterService.operationRouting().broadcastDeleteShards(clusterState, deleteRequest.index());
|
||||
for (ShardsIterator shardsId : groupShards) {
|
||||
List<BulkItemRequest> list = requestsByShard.get(shardsId.shardId());
|
||||
for (ShardIterator shardIt : groupShards) {
|
||||
List<BulkItemRequest> list = requestsByShard.get(shardIt.shardId());
|
||||
if (list == null) {
|
||||
list = Lists.newArrayList();
|
||||
requestsByShard.put(shardsId.shardId(), list);
|
||||
requestsByShard.put(shardIt.shardId(), list);
|
||||
}
|
||||
list.add(new BulkItemRequest(i, request));
|
||||
}
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -93,7 +93,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
|
|||
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ClusterState clusterState, BulkShardRequest request) {
|
||||
@Override protected ShardIterator shards(ClusterState clusterState, BulkShardRequest request) {
|
||||
return clusterState.routingTable().index(request.index()).shard(request.shardId()).shardsIt();
|
||||
}
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
@ -149,7 +149,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
|
|||
indexShard.delete(delete);
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ClusterState clusterState, DeleteRequest request) {
|
||||
@Override protected ShardIterator shards(ClusterState clusterState, DeleteRequest request) {
|
||||
return clusterService.operationRouting()
|
||||
.deleteShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
@ -83,11 +83,11 @@ public class TransportShardDeleteAction extends TransportShardReplicationOperati
|
|||
indexShard.delete(delete);
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ClusterState clusterState, ShardDeleteRequest request) {
|
||||
@Override protected ShardIterator shards(ClusterState clusterState, ShardDeleteRequest request) {
|
||||
GroupShardsIterator group = clusterService.operationRouting().broadcastDeleteShards(clusterService.state(), request.index());
|
||||
for (ShardsIterator shards : group) {
|
||||
if (shards.shardId().id() == request.shardId()) {
|
||||
return shards;
|
||||
for (ShardIterator shardIt : group) {
|
||||
if (shardIt.shardId().id() == request.shardId()) {
|
||||
return shardIt;
|
||||
}
|
||||
}
|
||||
throw new ElasticSearchIllegalStateException("No shards iterator found for shard [" + request.shardId() + "]");
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
@ -75,11 +75,11 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
|
|||
indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types());
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ClusterState clusterState, ShardDeleteByQueryRequest request) {
|
||||
@Override protected ShardIterator shards(ClusterState clusterState, ShardDeleteByQueryRequest request) {
|
||||
GroupShardsIterator group = clusterService.operationRouting().deleteByQueryShards(clusterService.state(), request.index(), request.routing());
|
||||
for (ShardsIterator shards : group) {
|
||||
if (shards.shardId().id() == request.shardId()) {
|
||||
return shards;
|
||||
for (ShardIterator shardIt : group) {
|
||||
if (shardIt.shardId().id() == request.shardId()) {
|
||||
return shardIt;
|
||||
}
|
||||
}
|
||||
throw new ElasticSearchIllegalStateException("No shards iterator found for shard [" + request.shardId() + "]");
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
|||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.UUID;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -143,7 +143,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
|
|||
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
|
||||
}
|
||||
|
||||
@Override protected ShardsIterator shards(ClusterState clusterState, IndexRequest request) {
|
||||
@Override protected ShardIterator shards(ClusterState clusterState, IndexRequest request) {
|
||||
return clusterService.operationRouting()
|
||||
.indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
|
||||
}
|
||||
|
|
|
@ -28,8 +28,8 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.trove.ExtTIntArrayList;
|
||||
import org.elasticsearch.search.SearchPhaseResult;
|
||||
|
@ -123,7 +123,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
public void start() {
|
||||
// count the local operations, and perform the non local ones
|
||||
int localOperations = 0;
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
for (final ShardIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = shardIt.nextActiveOrNull();
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
|
@ -143,7 +143,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
request.beforeLocalFork();
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
for (final ShardIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
|
@ -158,7 +158,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
if (localAsync) {
|
||||
request.beforeLocalFork();
|
||||
}
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
for (final ShardIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = shardIt.reset().nextActiveOrNull();
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
|
@ -178,7 +178,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
}
|
||||
}
|
||||
|
||||
private void performFirstPhase(final ShardsIterator shardIt) {
|
||||
private void performFirstPhase(final ShardIterator shardIt) {
|
||||
final ShardRouting shard = shardIt.nextActiveOrNull();
|
||||
if (shard == null) {
|
||||
// no more active shards... (we should not really get here, but just for safety)
|
||||
|
@ -201,7 +201,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
}
|
||||
}
|
||||
|
||||
private void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardsIterator shardIt) {
|
||||
private void onFirstPhaseResult(ShardRouting shard, FirstResult result, ShardIterator shardIt) {
|
||||
result.shardTarget(new SearchShardTarget(shard.currentNodeId(), shard.index(), shard.id()));
|
||||
processFirstPhaseResult(shard, result);
|
||||
// increment all the "future" shards to update the total ops since we some may work and some may not...
|
||||
|
@ -227,7 +227,7 @@ public abstract class TransportSearchTypeAction extends BaseAction<SearchRequest
|
|||
}
|
||||
}
|
||||
|
||||
private void onFirstPhaseResult(ShardRouting shard, final ShardsIterator shardIt, Throwable t) {
|
||||
private void onFirstPhaseResult(ShardRouting shard, final ShardIterator shardIt, Throwable t) {
|
||||
if (totalOps.incrementAndGet() == expectedTotalOps) {
|
||||
// e is null when there is no next active....
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
|
|
@ -27,8 +27,8 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.common.collect.ImmutableList;
|
||||
import org.elasticsearch.common.io.ThrowableObjectInputStream;
|
||||
import org.elasticsearch.common.io.ThrowableObjectOutputStream;
|
||||
|
@ -93,21 +93,21 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
|
||||
/**
|
||||
* Allows to override how shard routing is iterated over. Default implementation uses
|
||||
* {@link ShardsIterator#nextActiveOrNull()}.
|
||||
* {@link org.elasticsearch.cluster.routing.ShardIterator#nextActiveOrNull()}.
|
||||
*
|
||||
* <p>Note, if overriding this method, make sure to also override {@link #hasNextShard(org.elasticsearch.cluster.routing.ShardsIterator)}.
|
||||
* <p>Note, if overriding this method, make sure to also override {@link #hasNextShard(org.elasticsearch.cluster.routing.ShardIterator)}.
|
||||
*/
|
||||
protected ShardRouting nextShardOrNull(ShardsIterator shardIt) {
|
||||
protected ShardRouting nextShardOrNull(ShardIterator shardIt) {
|
||||
return shardIt.nextActiveOrNull();
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows to override how shard routing is iterated over. Default implementation uses
|
||||
* {@link ShardsIterator#hasNextActive()}.
|
||||
* {@link org.elasticsearch.cluster.routing.ShardIterator#hasNextActive()}.
|
||||
*
|
||||
* <p>Note, if overriding this method, make sure to also override {@link #nextShardOrNull(org.elasticsearch.cluster.routing.ShardsIterator)}.
|
||||
* <p>Note, if overriding this method, make sure to also override {@link #nextShardOrNull(org.elasticsearch.cluster.routing.ShardIterator)}.
|
||||
*/
|
||||
protected boolean hasNextShard(ShardsIterator shardIt) {
|
||||
protected boolean hasNextShard(ShardIterator shardIt) {
|
||||
return shardIt.hasNextActive();
|
||||
}
|
||||
|
||||
|
@ -168,7 +168,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
}
|
||||
// count the local operations, and perform the non local ones
|
||||
int localOperations = 0;
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
for (final ShardIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = nextShardOrNull(shardIt);
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
|
@ -188,7 +188,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
request.beforeLocalFork();
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
for (final ShardIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = nextShardOrNull(shardIt.reset());
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
|
@ -203,7 +203,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
if (localAsync) {
|
||||
request.beforeLocalFork();
|
||||
}
|
||||
for (final ShardsIterator shardIt : shardsIts) {
|
||||
for (final ShardIterator shardIt : shardsIts) {
|
||||
final ShardRouting shard = nextShardOrNull(shardIt.reset());
|
||||
if (shard != null) {
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
|
@ -215,7 +215,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
}
|
||||
}
|
||||
|
||||
private void performOperation(final ShardsIterator shardIt, boolean localAsync) {
|
||||
private void performOperation(final ShardIterator shardIt, boolean localAsync) {
|
||||
final ShardRouting shard = nextShardOrNull(shardIt);
|
||||
if (shard == null) {
|
||||
// no more active shards... (we should not really get here, just safety)
|
||||
|
@ -278,7 +278,7 @@ public abstract class TransportBroadcastOperationAction<Request extends Broadcas
|
|||
}
|
||||
|
||||
@SuppressWarnings({"unchecked"})
|
||||
private void onOperation(ShardRouting shard, final ShardsIterator shardIt, Throwable t, boolean alreadyThreaded) {
|
||||
private void onOperation(ShardRouting shard, final ShardIterator shardIt, Throwable t, boolean alreadyThreaded) {
|
||||
if (!hasNextShard(shardIt)) {
|
||||
// e is null when there is no next active....
|
||||
if (logger.isDebugEnabled()) {
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.elasticsearch.action.support.BaseAction;
|
|||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
@ -78,10 +78,10 @@ public abstract class TransportIndexReplicationOperationAction<Request extends I
|
|||
final AtomicInteger completionCounter = new AtomicInteger(groups.size());
|
||||
final AtomicReferenceArray<Object> shardsResponses = new AtomicReferenceArray<Object>(groups.size());
|
||||
|
||||
for (final ShardsIterator shards : groups) {
|
||||
ShardRequest shardRequest = newShardRequestInstance(request, shards.shardId().id());
|
||||
for (final ShardIterator shardIt : groups) {
|
||||
ShardRequest shardRequest = newShardRequestInstance(request, shardIt.shardId().id());
|
||||
|
||||
// TODO for now, we fork operations on shards of the index
|
||||
// TODO for now, we fork operations on shardIt of the index
|
||||
shardRequest.beforeLocalFork(); // optimize for local fork
|
||||
shardRequest.operationThreaded(true);
|
||||
|
||||
|
|
|
@ -33,8 +33,8 @@ import org.elasticsearch.cluster.TimeoutClusterStateListener;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
@ -107,7 +107,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
|
||||
protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest);
|
||||
|
||||
protected abstract ShardsIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException;
|
||||
protected abstract ShardIterator shards(ClusterState clusterState, Request request) throws ElasticSearchException;
|
||||
|
||||
protected abstract boolean checkWriteConsistency();
|
||||
|
||||
|
@ -223,7 +223,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
|
||||
private DiscoveryNodes nodes;
|
||||
|
||||
private ShardsIterator shards;
|
||||
private ShardIterator shardIt;
|
||||
|
||||
private final AtomicBoolean primaryOperationStarted = new AtomicBoolean();
|
||||
|
||||
|
@ -261,21 +261,21 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
return false;
|
||||
}
|
||||
try {
|
||||
shards = shards(clusterState, request);
|
||||
shardIt = shards(clusterState, request);
|
||||
} catch (Exception e) {
|
||||
listener.onFailure(e);
|
||||
return true;
|
||||
}
|
||||
|
||||
// no shards, might be in the case between index gateway recovery and shards initialization
|
||||
if (shards.size() == 0) {
|
||||
retry(fromClusterEvent, shards.shardId());
|
||||
// no shardIt, might be in the case between index gateway recovery and shardIt initialization
|
||||
if (shardIt.size() == 0) {
|
||||
retry(fromClusterEvent, shardIt.shardId());
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean foundPrimary = false;
|
||||
for (final ShardRouting shard : shards) {
|
||||
// we only deal with primary shards here...
|
||||
for (final ShardRouting shard : shardIt) {
|
||||
// we only deal with primary shardIt here...
|
||||
if (!shard.primary()) {
|
||||
continue;
|
||||
}
|
||||
|
@ -291,13 +291,13 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
consistencyLevel = request.consistencyLevel();
|
||||
}
|
||||
int requiredNumber = 1;
|
||||
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shards.size() > 2) {
|
||||
// only for more than 2 in the number of shards it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
|
||||
requiredNumber = (shards.size() / 2) + 1;
|
||||
if (consistencyLevel == WriteConsistencyLevel.QUORUM && shardIt.size() > 2) {
|
||||
// only for more than 2 in the number of shardIt it makes sense, otherwise its 1 shard with 1 replica, quorum is 1 (which is what it is initialized to)
|
||||
requiredNumber = (shardIt.size() / 2) + 1;
|
||||
} else if (consistencyLevel == WriteConsistencyLevel.ALL) {
|
||||
requiredNumber = shards.size();
|
||||
requiredNumber = shardIt.size();
|
||||
}
|
||||
if (shards.sizeActive() < requiredNumber) {
|
||||
if (shardIt.sizeActive() < requiredNumber) {
|
||||
retry(fromClusterEvent, shard.shardId());
|
||||
return false;
|
||||
}
|
||||
|
@ -353,7 +353,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
}
|
||||
// we should never get here, but here we go
|
||||
if (!foundPrimary) {
|
||||
final UnavailableShardsException failure = new UnavailableShardsException(shards.shardId(), request.toString());
|
||||
final UnavailableShardsException failure = new UnavailableShardsException(shardIt.shardId(), request.toString());
|
||||
if (request.listenerThreaded()) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
|
@ -398,7 +398,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
return;
|
||||
}
|
||||
clusterService.remove(this);
|
||||
final UnavailableShardsException failure = new UnavailableShardsException(shardId, "[" + shards.size() + "] shards, [" + shards.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
|
||||
final UnavailableShardsException failure = new UnavailableShardsException(shardId, "[" + shardIt.size() + "] shardIt, [" + shardIt.sizeActive() + "] active : Timeout waiting for [" + timeValue + "], request: " + request.toString());
|
||||
if (request.listenerThreaded()) {
|
||||
threadPool.execute(new Runnable() {
|
||||
@Override public void run() {
|
||||
|
@ -426,12 +426,12 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(shard.shortSummary() + ": Failed to execute [" + request + "]", e);
|
||||
}
|
||||
listener.onFailure(new ReplicationShardOperationFailedException(shards.shardId(), e));
|
||||
listener.onFailure(new ReplicationShardOperationFailedException(shardIt.shardId(), e));
|
||||
}
|
||||
}
|
||||
|
||||
private void performReplicas(final Response response, boolean alreadyThreaded) {
|
||||
if (ignoreReplicas() || shards.size() == 1 /* no replicas */) {
|
||||
if (ignoreReplicas() || shardIt.size() == 1 /* no replicas */) {
|
||||
if (alreadyThreaded || !request.listenerThreaded()) {
|
||||
listener.onResponse(response);
|
||||
} else {
|
||||
|
@ -447,7 +447,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
// initialize the counter
|
||||
int replicaCounter = 0;
|
||||
|
||||
for (final ShardRouting shard : shards.reset()) {
|
||||
for (final ShardRouting shard : shardIt.reset()) {
|
||||
// if its unassigned, nothing to do here...
|
||||
if (shard.unassigned()) {
|
||||
continue;
|
||||
|
@ -499,7 +499,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
}
|
||||
|
||||
AtomicInteger counter = new AtomicInteger(replicaCounter);
|
||||
for (final ShardRouting shard : shards.reset()) {
|
||||
for (final ShardRouting shard : shardIt.reset()) {
|
||||
// if its unassigned, nothing to do here...
|
||||
if (shard.unassigned()) {
|
||||
continue;
|
||||
|
@ -545,7 +545,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
return;
|
||||
}
|
||||
|
||||
final ShardOperationRequest shardRequest = new ShardOperationRequest(shards.shardId().id(), request);
|
||||
final ShardOperationRequest shardRequest = new ShardOperationRequest(shardIt.shardId().id(), request);
|
||||
if (!nodeId.equals(nodes.localNodeId())) {
|
||||
DiscoveryNode node = nodes.get(nodeId);
|
||||
transportService.sendRequest(node, transportReplicaAction(), shardRequest, transportOptions(), new VoidTransportResponseHandler() {
|
||||
|
@ -555,7 +555,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
|
||||
@Override public void handleException(TransportException exp) {
|
||||
if (!ignoreReplicaException(exp.unwrapCause())) {
|
||||
logger.warn("Failed to perform " + transportAction() + " on replica " + shards.shardId(), exp);
|
||||
logger.warn("Failed to perform " + transportAction() + " on replica " + shardIt.shardId(), exp);
|
||||
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on replica, message [" + detailedMessage(exp) + "]");
|
||||
}
|
||||
finishIfPossible();
|
||||
|
@ -589,7 +589,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
shardOperationOnReplica(shardRequest);
|
||||
} catch (Exception e) {
|
||||
if (!ignoreReplicaException(e)) {
|
||||
logger.warn("Failed to perform " + transportAction() + " on replica " + shards.shardId(), e);
|
||||
logger.warn("Failed to perform " + transportAction() + " on replica " + shardIt.shardId(), e);
|
||||
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on replica, message [" + detailedMessage(e) + "]");
|
||||
}
|
||||
}
|
||||
|
@ -603,7 +603,7 @@ public abstract class TransportShardReplicationOperationAction<Request extends S
|
|||
shardOperationOnReplica(shardRequest);
|
||||
} catch (Exception e) {
|
||||
if (!ignoreReplicaException(e)) {
|
||||
logger.warn("Failed to perform " + transportAction() + " on replica" + shards.shardId(), e);
|
||||
logger.warn("Failed to perform " + transportAction() + " on replica" + shardIt.shardId(), e);
|
||||
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction() + "] on replica, message [" + detailedMessage(e) + "]");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,8 +28,8 @@ import org.elasticsearch.cluster.ClusterService;
|
|||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
@ -82,7 +82,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
|
|||
|
||||
private final ActionListener<Response> listener;
|
||||
|
||||
private final ShardsIterator shardsIt;
|
||||
private final ShardIterator shardIt;
|
||||
|
||||
private final Request request;
|
||||
|
||||
|
@ -101,7 +101,7 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
|
|||
|
||||
checkBlock(request, clusterState);
|
||||
|
||||
this.shardsIt = clusterService.operationRouting()
|
||||
this.shardIt = clusterService.operationRouting()
|
||||
.getShards(clusterState, request.index(), request.type(), request.id(), request.routing());
|
||||
}
|
||||
|
||||
|
@ -120,8 +120,8 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
|
|||
* First get should try and use a shard that exists on a local node for better performance
|
||||
*/
|
||||
private void performFirst() {
|
||||
while (shardsIt.hasNextActive()) {
|
||||
final ShardRouting shard = shardsIt.nextActive();
|
||||
while (shardIt.hasNextActive()) {
|
||||
final ShardRouting shard = shardIt.nextActive();
|
||||
if (shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
if (request.operationThreaded()) {
|
||||
threadPool.execute(new Runnable() {
|
||||
|
@ -154,16 +154,16 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
|
|||
}
|
||||
}
|
||||
}
|
||||
if (!shardsIt.hasNextActive()) {
|
||||
if (!shardIt.hasNextActive()) {
|
||||
// no local node get, go remote
|
||||
shardsIt.reset();
|
||||
shardIt.reset();
|
||||
perform(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void perform(final Exception lastException) {
|
||||
while (shardsIt.hasNextActive()) {
|
||||
final ShardRouting shard = shardsIt.nextActive();
|
||||
while (shardIt.hasNextActive()) {
|
||||
final ShardRouting shard = shardIt.nextActive();
|
||||
// no need to check for local nodes, we tried them already in performFirstGet
|
||||
if (!shard.currentNodeId().equals(nodes.localNodeId())) {
|
||||
DiscoveryNode node = nodes.get(shard.currentNodeId());
|
||||
|
@ -196,13 +196,13 @@ public abstract class TransportShardSingleOperationAction<Request extends Single
|
|||
return;
|
||||
}
|
||||
}
|
||||
if (!shardsIt.hasNextActive()) {
|
||||
if (!shardIt.hasNextActive()) {
|
||||
Exception failure = lastException;
|
||||
if (failure == null) {
|
||||
failure = new NoShardAvailableActionException(shardsIt.shardId(), "No shard available for [" + request.type() + "#" + request.id() + "]");
|
||||
failure = new NoShardAvailableActionException(shardIt.shardId(), "No shard available for [" + request.type() + "#" + request.id() + "]");
|
||||
} else {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug(shardsIt.shardId() + ": Failed to get [" + request.type() + "#" + request.id() + "]", failure);
|
||||
logger.debug(shardIt.shardId() + ": Failed to get [" + request.type() + "#" + request.id() + "]", failure);
|
||||
}
|
||||
}
|
||||
if (request.listenerThreaded()) {
|
||||
|
|
|
@ -25,17 +25,17 @@ import java.util.Iterator;
|
|||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class GroupShardsIterator implements Iterable<ShardsIterator> {
|
||||
public class GroupShardsIterator implements Iterable<ShardIterator> {
|
||||
|
||||
private final Collection<ShardsIterator> iterators;
|
||||
private final Collection<ShardIterator> iterators;
|
||||
|
||||
public GroupShardsIterator(Collection<ShardsIterator> iterators) {
|
||||
public GroupShardsIterator(Collection<ShardIterator> iterators) {
|
||||
this.iterators = iterators;
|
||||
}
|
||||
|
||||
public int totalSize() {
|
||||
int size = 0;
|
||||
for (ShardsIterator shard : iterators) {
|
||||
for (ShardIterator shard : iterators) {
|
||||
size += shard.size();
|
||||
}
|
||||
return size;
|
||||
|
@ -43,7 +43,7 @@ public class GroupShardsIterator implements Iterable<ShardsIterator> {
|
|||
|
||||
public int totalSizeActive() {
|
||||
int size = 0;
|
||||
for (ShardsIterator shard : iterators) {
|
||||
for (ShardIterator shard : iterators) {
|
||||
size += shard.sizeActive();
|
||||
}
|
||||
return size;
|
||||
|
@ -53,11 +53,11 @@ public class GroupShardsIterator implements Iterable<ShardsIterator> {
|
|||
return iterators.size();
|
||||
}
|
||||
|
||||
public Collection<ShardsIterator> iterators() {
|
||||
public Collection<ShardIterator> iterators() {
|
||||
return iterators;
|
||||
}
|
||||
|
||||
@Override public Iterator<ShardsIterator> iterator() {
|
||||
@Override public Iterator<ShardIterator> iterator() {
|
||||
return iterators.iterator();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -133,8 +133,8 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
|
|||
return shardIdentifier;
|
||||
}
|
||||
|
||||
@Override public ShardsIterator shardsIt() {
|
||||
return new PlainShardsIterator(shardId(), ImmutableList.of((ShardRouting) this));
|
||||
@Override public ShardIterator shardsIt() {
|
||||
return new PlainShardIterator(shardId(), ImmutableList.of((ShardRouting) this));
|
||||
}
|
||||
|
||||
public static ImmutableShardRouting readShardRoutingEntry(StreamInput in) throws IOException {
|
||||
|
|
|
@ -138,12 +138,12 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
|||
}
|
||||
|
||||
/**
|
||||
* A group shards iterator where each group ({@link ShardsIterator}
|
||||
* A group shards iterator where each group ({@link ShardIterator}
|
||||
* is an iterator across shard replication group.
|
||||
*/
|
||||
public GroupShardsIterator groupByShardsIt() {
|
||||
// use list here since we need to maintain identity across shards
|
||||
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
|
||||
ArrayList<ShardIterator> set = new ArrayList<ShardIterator>(shards.size());
|
||||
for (IndexShardRoutingTable indexShard : this) {
|
||||
set.add(indexShard.shardsIt());
|
||||
}
|
||||
|
@ -159,7 +159,7 @@ public class IndexRoutingTable implements Iterable<IndexShardRoutingTable> {
|
|||
*/
|
||||
public GroupShardsIterator groupByAllIt() {
|
||||
// use list here since we need to maintain identity across shards
|
||||
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
|
||||
ArrayList<ShardIterator> set = new ArrayList<ShardIterator>();
|
||||
for (IndexShardRoutingTable indexShard : this) {
|
||||
for (ShardRouting shardRouting : indexShard) {
|
||||
set.add(shardRouting.shardsIt());
|
||||
|
|
|
@ -90,12 +90,12 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
return count;
|
||||
}
|
||||
|
||||
public ShardsIterator shardsIt() {
|
||||
return new IndexShardsIterator(0);
|
||||
public ShardIterator shardsIt() {
|
||||
return new IndexShardIterator(0);
|
||||
}
|
||||
|
||||
public ShardsIterator shardsRandomIt() {
|
||||
return new IndexShardsIterator(nextCounter());
|
||||
public ShardIterator shardsRandomIt() {
|
||||
return new IndexShardIterator(nextCounter());
|
||||
}
|
||||
|
||||
public ShardRouting primaryShard() {
|
||||
|
@ -141,7 +141,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
* <p>The class can be used from different threads, though not designed to be used concurrently
|
||||
* from different threads.
|
||||
*/
|
||||
class IndexShardsIterator implements ShardsIterator, Iterator<ShardRouting> {
|
||||
class IndexShardIterator implements ShardIterator, Iterator<ShardRouting> {
|
||||
|
||||
private final int origIndex;
|
||||
|
||||
|
@ -149,7 +149,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
|
||||
private volatile int counter = 0;
|
||||
|
||||
private IndexShardsIterator(int index) {
|
||||
private IndexShardIterator(int index) {
|
||||
this.origIndex = index;
|
||||
this.index = index;
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override public ShardsIterator reset() {
|
||||
@Override public ShardIterator reset() {
|
||||
counter = 0;
|
||||
index = origIndex;
|
||||
return this;
|
||||
|
@ -283,7 +283,7 @@ public class IndexShardRoutingTable implements Iterable<ShardRouting> {
|
|||
@Override public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
||||
ShardsIterator that = (ShardsIterator) o;
|
||||
ShardIterator that = (ShardIterator) o;
|
||||
|
||||
if (shardId != null ? !shardId.equals(that.shardId()) : that.shardId() != null) return false;
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ import java.util.NoSuchElementException;
|
|||
/**
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public class PlainShardsIterator implements ShardsIterator {
|
||||
public class PlainShardIterator implements ShardIterator {
|
||||
|
||||
private final ShardId shardId;
|
||||
|
||||
|
@ -36,12 +36,12 @@ public class PlainShardsIterator implements ShardsIterator {
|
|||
|
||||
private volatile int counter = 0;
|
||||
|
||||
public PlainShardsIterator(ShardId shardId, List<ShardRouting> shards) {
|
||||
public PlainShardIterator(ShardId shardId, List<ShardRouting> shards) {
|
||||
this.shardId = shardId;
|
||||
this.shards = shards;
|
||||
}
|
||||
|
||||
@Override public ShardsIterator reset() {
|
||||
@Override public ShardIterator reset() {
|
||||
this.counter = 0;
|
||||
return this;
|
||||
}
|
||||
|
@ -152,7 +152,7 @@ public class PlainShardsIterator implements ShardsIterator {
|
|||
@Override public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
||||
ShardsIterator that = (ShardsIterator) o;
|
||||
ShardIterator that = (ShardIterator) o;
|
||||
|
||||
if (shardId != null ? !shardId.equals(that.shardId()) : that.shardId() != null) return false;
|
||||
|
|
@ -145,7 +145,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
|
|||
*/
|
||||
public GroupShardsIterator allShardsGrouped(String... indices) throws IndexMissingException {
|
||||
// use list here since we need to maintain identity across shards
|
||||
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
|
||||
ArrayList<ShardIterator> set = new ArrayList<ShardIterator>();
|
||||
if (indices == null || indices.length == 0) {
|
||||
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
|
||||
}
|
||||
|
@ -177,7 +177,7 @@ public class RoutingTable implements Iterable<IndexRoutingTable> {
|
|||
*/
|
||||
public GroupShardsIterator primaryShardsGrouped(String... indices) throws IndexMissingException {
|
||||
// use list here since we need to maintain identity across shards
|
||||
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
|
||||
ArrayList<ShardIterator> set = new ArrayList<ShardIterator>();
|
||||
if (indices == null || indices.length == 0) {
|
||||
indices = indicesRouting.keySet().toArray(new String[indicesRouting.keySet().size()]);
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import java.util.NoSuchElementException;
|
|||
*
|
||||
* @author kimchy (shay.banon)
|
||||
*/
|
||||
public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRouting> {
|
||||
public interface ShardIterator extends Iterable<ShardRouting>, Iterator<ShardRouting> {
|
||||
|
||||
/**
|
||||
* The shard id this group relates to.
|
||||
|
@ -39,7 +39,7 @@ public interface ShardsIterator extends Iterable<ShardRouting>, Iterator<ShardRo
|
|||
/**
|
||||
* Resets the iterator.
|
||||
*/
|
||||
ShardsIterator reset();
|
||||
ShardIterator reset();
|
||||
|
||||
/**
|
||||
* The number of shard routing instances.
|
|
@ -118,7 +118,7 @@ public interface ShardRouting extends Streamable, Serializable {
|
|||
/**
|
||||
* A shard iterator with just this shard in it.
|
||||
*/
|
||||
ShardsIterator shardsIt();
|
||||
ShardIterator shardsIt();
|
||||
|
||||
/**
|
||||
* Does not write index name and shard id
|
||||
|
|
|
@ -21,7 +21,7 @@ package org.elasticsearch.cluster.routing.operation;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.index.IndexShardMissingException;
|
||||
import org.elasticsearch.indices.IndexMissingException;
|
||||
|
||||
|
@ -32,13 +32,13 @@ import javax.annotation.Nullable;
|
|||
*/
|
||||
public interface OperationRouting {
|
||||
|
||||
ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
|
||||
ShardIterator indexShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
|
||||
|
||||
ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
|
||||
ShardIterator deleteShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
|
||||
|
||||
GroupShardsIterator broadcastDeleteShards(ClusterState clusterState, String index) throws IndexMissingException, IndexShardMissingException;
|
||||
|
||||
ShardsIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
|
||||
ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException;
|
||||
|
||||
GroupShardsIterator deleteByQueryShards(ClusterState clusterState, String index, @Nullable String routing) throws IndexMissingException;
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.operation.OperationRouting;
|
||||
import org.elasticsearch.cluster.routing.operation.hash.HashFunction;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
|
@ -57,15 +57,15 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
|
|||
this.useType = indexSettings.getAsBoolean("cluster.routing.operation.use_type", false);
|
||||
}
|
||||
|
||||
@Override public ShardsIterator indexShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
|
||||
@Override public ShardIterator indexShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
|
||||
return shards(clusterState, index, type, id, routing).shardsIt();
|
||||
}
|
||||
|
||||
@Override public ShardsIterator deleteShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
|
||||
@Override public ShardIterator deleteShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
|
||||
return shards(clusterState, index, type, id, routing).shardsIt();
|
||||
}
|
||||
|
||||
@Override public ShardsIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
|
||||
@Override public ShardIterator getShards(ClusterState clusterState, String index, String type, String id, @Nullable String routing) throws IndexMissingException, IndexShardMissingException {
|
||||
return shards(clusterState, index, type, id, routing).shardsRandomIt();
|
||||
}
|
||||
|
||||
|
@ -84,7 +84,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
|
|||
}
|
||||
|
||||
// we use set here and not identity set since we might get duplicates
|
||||
HashSet<ShardsIterator> set = new HashSet<ShardsIterator>();
|
||||
HashSet<ShardIterator> set = new HashSet<ShardIterator>();
|
||||
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
|
||||
for (String r : routings) {
|
||||
int shardId = shardId(clusterState, index, null, null, r);
|
||||
|
@ -109,7 +109,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
|
|||
|
||||
if (routings != null && routings.length > 0) {
|
||||
// we use set here and not list since we might get duplicates
|
||||
HashSet<ShardsIterator> set = new HashSet<ShardsIterator>();
|
||||
HashSet<ShardIterator> set = new HashSet<ShardIterator>();
|
||||
for (String index : indices) {
|
||||
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
|
||||
for (String r : routings) {
|
||||
|
@ -125,7 +125,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio
|
|||
return new GroupShardsIterator(set);
|
||||
} else {
|
||||
// we use list here since we know we are not going to create duplicates
|
||||
ArrayList<ShardsIterator> set = new ArrayList<ShardsIterator>();
|
||||
ArrayList<ShardIterator> set = new ArrayList<ShardIterator>();
|
||||
for (String index : indices) {
|
||||
IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index);
|
||||
for (IndexShardRoutingTable indexShard : indexRouting) {
|
||||
|
|
|
@ -22,8 +22,8 @@ package org.elasticsearch.test.integration.search;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
import org.elasticsearch.common.trove.ExtTIntArrayList;
|
||||
|
@ -114,8 +114,8 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
|
|||
.from(0).size(60).explain(true).indexBoost("test", 1.0f).indexBoost("test2", 2.0f);
|
||||
|
||||
List<DfsSearchResult> dfsResults = newArrayList();
|
||||
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardsIt) {
|
||||
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardIt) {
|
||||
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
|
||||
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
|
||||
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
|
||||
|
@ -182,8 +182,8 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
|
|||
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
|
||||
|
||||
List<DfsSearchResult> dfsResults = newArrayList();
|
||||
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardsIt) {
|
||||
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardIt) {
|
||||
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
|
||||
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
|
||||
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
|
||||
|
@ -276,8 +276,8 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
|
|||
}
|
||||
|
||||
Map<SearchShardTarget, QueryFetchSearchResult> queryFetchResults = newHashMap();
|
||||
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardsIt) {
|
||||
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardIt) {
|
||||
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
|
||||
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
|
||||
QueryFetchSearchResult queryFetchResult = nodeToSearchService.get(shardRouting.currentNodeId()).executeFetchPhase(searchRequest);
|
||||
|
@ -328,8 +328,8 @@ public class TwoInstanceEmbeddedSearchTests extends AbstractNodesTests {
|
|||
.facet(FacetBuilders.queryFacet("test1", termQuery("name", "test1")));
|
||||
|
||||
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
|
||||
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardsIt) {
|
||||
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardIt) {
|
||||
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
|
||||
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
|
||||
QuerySearchResult queryResult = nodeToSearchService.get(shardRouting.currentNodeId()).executeQueryPhase(searchRequest);
|
||||
|
|
|
@ -22,8 +22,8 @@ package org.elasticsearch.test.integration.search;
|
|||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Requests;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.routing.ShardIterator;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardsIterator;
|
||||
import org.elasticsearch.cluster.routing.operation.OperationRouting;
|
||||
import org.elasticsearch.cluster.routing.operation.plain.PlainOperationRouting;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
|
@ -118,8 +118,8 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
|
|||
.from(0).size(60).explain(true);
|
||||
|
||||
List<DfsSearchResult> dfsResults = newArrayList();
|
||||
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardsIt) {
|
||||
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardIt) {
|
||||
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
|
||||
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
|
||||
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
|
||||
|
@ -185,8 +185,8 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
|
|||
.from(0).size(60).explain(true).sort("age", SortOrder.ASC);
|
||||
|
||||
List<DfsSearchResult> dfsResults = newArrayList();
|
||||
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardsIt) {
|
||||
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardIt) {
|
||||
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
|
||||
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
|
||||
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
|
||||
|
@ -275,8 +275,8 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
|
|||
|
||||
// do this with dfs, since we have uneven distribution of docs between shards
|
||||
List<DfsSearchResult> dfsResults = newArrayList();
|
||||
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardsIt) {
|
||||
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardIt) {
|
||||
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
|
||||
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
|
||||
dfsResults.add(nodeToSearchService.get(shardRouting.currentNodeId()).executeDfsPhase(searchRequest));
|
||||
|
@ -330,8 +330,8 @@ public class TwoInstanceUnbalancedShardsEmbeddedSearchTests extends AbstractNode
|
|||
.facet(queryFacet("test1").query(termQuery("name", "test1")));
|
||||
|
||||
Map<SearchShardTarget, QuerySearchResultProvider> queryResults = newHashMap();
|
||||
for (ShardsIterator shardsIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardsIt) {
|
||||
for (ShardIterator shardIt : clusterService.operationRouting().searchShards(clusterService.state(), new String[]{"test"}, null, null)) {
|
||||
for (ShardRouting shardRouting : shardIt) {
|
||||
InternalSearchRequest searchRequest = searchRequest(shardRouting, sourceBuilder)
|
||||
.scroll(new Scroll(new TimeValue(10, TimeUnit.MINUTES)));
|
||||
QuerySearchResult queryResult = nodeToSearchService.get(shardRouting.currentNodeId()).executeQueryPhase(searchRequest);
|
||||
|
|
Loading…
Reference in New Issue