diff --git a/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java b/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java index 833e35ee9bd..05f57967493 100644 --- a/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java +++ b/src/main/java/org/elasticsearch/action/NoShardAvailableActionException.java @@ -21,12 +21,17 @@ package org.elasticsearch.action; import org.elasticsearch.index.shard.IndexShardException; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.rest.RestStatus; /** * */ public class NoShardAvailableActionException extends IndexShardException { + public NoShardAvailableActionException(ShardId shardId) { + super(shardId, null); + } + public NoShardAvailableActionException(ShardId shardId, String msg) { super(shardId, msg); } @@ -35,4 +40,8 @@ public class NoShardAvailableActionException extends IndexShardException { super(shardId, msg, cause); } + @Override + public RestStatus status() { + return RestStatus.SERVICE_UNAVAILABLE; + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/action/search/SearchPhaseExecutionException.java b/src/main/java/org/elasticsearch/action/search/SearchPhaseExecutionException.java index acc04e24ee2..b2a39fd8354 100644 --- a/src/main/java/org/elasticsearch/action/search/SearchPhaseExecutionException.java +++ b/src/main/java/org/elasticsearch/action/search/SearchPhaseExecutionException.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.rest.RestStatus; /** * @@ -42,6 +43,23 @@ public class SearchPhaseExecutionException extends ElasticSearchException { this.shardFailures = shardFailures; } + @Override + public RestStatus status() { + if (shardFailures.length == 0) { + // if no successful shards, it means no active shards, so just return SERVICE_UNAVAILABLE + return RestStatus.SERVICE_UNAVAILABLE; + } + RestStatus status = shardFailures[0].status(); + if (shardFailures.length > 1) { + for (int i = 1; i < shardFailures.length; i++) { + if (shardFailures[i].status().getStatus() >= 500) { + status = shardFailures[i].status(); + } + } + } + return status; + } + public String phaseName() { return phaseName; } diff --git a/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java b/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java index 72ff8a56301..79cdd061cc6 100644 --- a/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java +++ b/src/main/java/org/elasticsearch/action/search/ShardSearchFailure.java @@ -41,16 +41,21 @@ public class ShardSearchFailure implements ShardOperationFailedException { public static final ShardSearchFailure[] EMPTY_ARRAY = new ShardSearchFailure[0]; private SearchShardTarget shardTarget; - private String reason; - private RestStatus status; + private transient Throwable failure; private ShardSearchFailure() { } + @Nullable + public Throwable failure() { + return failure; + } + public ShardSearchFailure(Throwable t) { + this.failure = t; Throwable actual = ExceptionsHelper.unwrapCause(t); if (actual != null && actual instanceof SearchException) { this.shardTarget = ((SearchException) actual).shard(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index c852c999b58..9bdc98001df 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -20,7 +20,10 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchOperationThreading; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; @@ -144,7 +147,7 @@ public class TransportSearchDfsQueryAndFetchAction extends TransportSearchTypeAc if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); } - AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(shardIndex, t); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index d60ed75ac1f..4aec87d3619 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -20,7 +20,10 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchOperationThreading; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; @@ -153,7 +156,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); } - AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(shardIndex, t); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { executeFetchPhase(); @@ -246,7 +249,7 @@ public class TransportSearchDfsQueryThenFetchAction extends TransportSearchTypeA if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); } - AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(shardIndex, t); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 7651d046d06..08e3940c6f2 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -20,7 +20,10 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchOperationThreading; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; @@ -155,7 +158,7 @@ public class TransportSearchQueryThenFetchAction extends TransportSearchTypeActi if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); } - AsyncAction.this.addShardFailure(shardIndex, new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(shardIndex, t); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 7c22700db28..076469c418a 100644 --- a/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -20,7 +20,9 @@ package org.elasticsearch.action.search.type; import org.apache.lucene.search.ScoreDoc; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.search.*; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.cluster.ClusterService; @@ -35,7 +37,9 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; import org.elasticsearch.common.util.concurrent.AtomicArray; -import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.index.IndexShardMissingException; +import org.elasticsearch.index.shard.IllegalIndexShardStateException; +import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; @@ -90,7 +94,7 @@ public abstract class TransportSearchTypeAction extends TransportAction firstResults; - private volatile AtomicArray shardFailures; + private final AtomicArray shardFailures; protected volatile ScoreDoc[] sortedShardList; protected final long startTime = System.currentTimeMillis(); @@ -123,6 +127,7 @@ public abstract class TransportSearchTypeAction extends TransportAction(shardsIts.size()); + shardFailures = new AtomicArray(shardsIts.size()); } public void start() { @@ -142,7 +147,7 @@ public abstract class TransportSearchTypeAction extends TransportAction() { @@ -242,6 +247,10 @@ public abstract class TransportSearchTypeAction extends TransportAction(shardsIts.size()); + protected final void addShardFailure(final int shardIndex, Throwable t) { + ShardSearchFailure failure = shardFailures.get(shardIndex); + if (failure == null) { + shardFailures.set(shardIndex, new ShardSearchFailure(t)); + } else { + // the failure is already present, try and not override it with an exception that is less meaningless + // for example, getting illegal shard state + if (isOverrideException(t)) { + shardFailures.set(shardIndex, new ShardSearchFailure(t)); + } } - shardFailures.set(shardIndex, failure); + } + + protected boolean isOverrideException(Throwable t) { + Throwable actual = ExceptionsHelper.unwrapCause(t); + if (actual instanceof IllegalIndexShardStateException) { + return false; + } + if (actual instanceof IndexMissingException) { + return false; + } + if (actual instanceof IndexShardMissingException) { + return false; + } + if (actual instanceof NoShardAvailableActionException) { + return false; + } + return false; } /** @@ -359,6 +376,8 @@ public abstract class TransportSearchTypeAction extends TransportAction listener) { @@ -190,18 +189,20 @@ public abstract class TransportBroadcastOperationAction() { @Override @@ -286,12 +291,12 @@ public abstract class TransportBroadcastOperationAction { diff --git a/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java b/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java index 427f998ba4e..7a41abd0dbc 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java +++ b/src/main/java/org/elasticsearch/cluster/routing/IndexShardRoutingTable.java @@ -52,6 +52,7 @@ public class IndexShardRoutingTable implements Iterable { final ImmutableList shards; final ImmutableList activeShards; final ImmutableList assignedShards; + final ImmutableList initializingShards; final AtomicInteger counter; @@ -67,6 +68,7 @@ public class IndexShardRoutingTable implements Iterable { ImmutableList.Builder replicas = ImmutableList.builder(); ImmutableList.Builder activeShards = ImmutableList.builder(); ImmutableList.Builder assignedShards = ImmutableList.builder(); + ImmutableList.Builder initializingShards = ImmutableList.builder(); for (ShardRouting shard : shards) { if (shard.primary()) { @@ -77,6 +79,9 @@ public class IndexShardRoutingTable implements Iterable { if (shard.active()) { activeShards.add(shard); } + if (shard.initializing()) { + initializingShards.add(shard); + } if (shard.assignedToNode()) { assignedShards.add(shard); } @@ -91,6 +96,7 @@ public class IndexShardRoutingTable implements Iterable { this.replicas = replicas.build(); this.activeShards = activeShards.build(); this.assignedShards = assignedShards.build(); + this.initializingShards = initializingShards.build(); } /** @@ -243,7 +249,7 @@ public class IndexShardRoutingTable implements Iterable { } public ShardIterator shardsRandomIt() { - return new PlainShardIterator(shardId, shards, counter.getAndIncrement()); + return new PlainShardIterator(shardId, shards, pickIndex()); } public ShardIterator shardsIt() { @@ -255,7 +261,7 @@ public class IndexShardRoutingTable implements Iterable { } public ShardIterator activeShardsRandomIt() { - return new PlainShardIterator(shardId, activeShards, counter.getAndIncrement()); + return new PlainShardIterator(shardId, activeShards, pickIndex()); } public ShardIterator activeShardsIt() { @@ -266,8 +272,30 @@ public class IndexShardRoutingTable implements Iterable { return new PlainShardIterator(shardId, activeShards, index); } + /** + * Returns an iterator over active and initializing shards. Making sure though that + * its random within the active shards, and initializing shards are the last to iterate through. + */ + public ShardIterator activeInitializingShardsRandomIt() { + return activeInitializingShardsIt(pickIndex()); + } + + /** + * Returns an iterator over active and initializing shards. Making sure though that + * its random within the active shards, and initializing shards are the last to iterate through. + */ + public ShardIterator activeInitializingShardsIt(int index) { + if (initializingShards.isEmpty()) { + return new PlainShardIterator(shardId, activeShards, index); + } + ArrayList ordered = new ArrayList(activeShards.size() + initializingShards.size()); + addToListFromIndex(activeShards, ordered, index); + ordered.addAll(initializingShards); + return new PlainShardIterator(shardId, ordered); + } + public ShardIterator assignedShardsRandomIt() { - return new PlainShardIterator(shardId, assignedShards, counter.getAndIncrement()); + return new PlainShardIterator(shardId, assignedShards, pickIndex()); } public ShardIterator assignedShardsIt() { @@ -284,19 +312,19 @@ public class IndexShardRoutingTable implements Iterable { public ShardIterator primaryShardIt() { return new PlainShardIterator(shardId, primaryAsList); } - - public ShardIterator primaryActiveShardIt() { - if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active()) { + + public ShardIterator primaryActiveInitializingShardIt() { + if (!primaryAsList.isEmpty() && !primaryAsList.get(0).active() && !primaryAsList.get(0).initializing()) { List primaryList = ImmutableList.of(); return new PlainShardIterator(shardId, primaryList); } return primaryShardIt(); } - public ShardIterator primaryFirstActiveShardsIt() { - ArrayList ordered = new ArrayList(activeShards.size()); + public ShardIterator primaryFirstActiveInitializingShardsIt() { + ArrayList ordered = new ArrayList(activeShards.size() + initializingShards.size()); // fill it in a randomized fashion - int index = Math.abs(counter.getAndIncrement()); + int index = Math.abs(pickIndex()); for (int i = 0; i < activeShards.size(); i++) { int loc = (index + i) % activeShards.size(); ShardRouting shardRouting = activeShards.get(loc); @@ -307,21 +335,24 @@ public class IndexShardRoutingTable implements Iterable { ordered.set(0, shardRouting); } } + // no need to worry about primary first here..., its temporal + if (!initializingShards.isEmpty()) { + ordered.addAll(initializingShards); + } return new PlainShardIterator(shardId, ordered); } - /** - * Prefers execution on the provided node if applicable. - */ - public ShardIterator preferNodeShardsIt(String nodeId) { - return preferNodeShardsIt(nodeId, shards); - } - - public ShardIterator onlyNodeActiveShardsIt(String nodeId) { - ArrayList ordered = new ArrayList(shards.size()); + public ShardIterator onlyNodeActiveInitializingShardsIt(String nodeId) { + ArrayList ordered = new ArrayList(activeShards.size() + initializingShards.size()); // fill it in a randomized fashion - for (int i = 0; i < shards.size(); i++) { - ShardRouting shardRouting = shards.get(i); + for (int i = 0; i < activeShards.size(); i++) { + ShardRouting shardRouting = activeShards.get(i); + if (nodeId.equals(shardRouting.currentNodeId())) { + ordered.add(shardRouting); + } + } + for (int i = 0; i < initializingShards.size(); i++) { + ShardRouting shardRouting = initializingShards.get(i); if (nodeId.equals(shardRouting.currentNodeId())) { ordered.add(shardRouting); } @@ -329,27 +360,13 @@ public class IndexShardRoutingTable implements Iterable { return new PlainShardIterator(shardId, ordered); } - /** - * Prefers execution on the provided node if applicable. - */ - public ShardIterator preferNodeActiveShardsIt(String nodeId) { - return preferNodeShardsIt(nodeId, activeShards); - } - - /** - * Prefers execution on the provided node if applicable. - */ - public ShardIterator preferNodeAssignedShardsIt(String nodeId) { - return preferNodeShardsIt(nodeId, assignedShards); - } - - private ShardIterator preferNodeShardsIt(String nodeId, ImmutableList shards) { - ArrayList ordered = new ArrayList(shards.size()); + public ShardIterator preferNodeActiveInitializingShardsIt(String nodeId) { + ArrayList ordered = new ArrayList(activeShards.size() + initializingShards.size()); // fill it in a randomized fashion - int index = Math.abs(counter.getAndIncrement()); - for (int i = 0; i < shards.size(); i++) { - int loc = (index + i) % shards.size(); - ShardRouting shardRouting = shards.get(loc); + int index = pickIndex(); + for (int i = 0; i < activeShards.size(); i++) { + int loc = (index + i) % activeShards.size(); + ShardRouting shardRouting = activeShards.get(loc); ordered.add(shardRouting); if (nodeId.equals(shardRouting.currentNodeId())) { // switch, its the matching node id @@ -357,6 +374,9 @@ public class IndexShardRoutingTable implements Iterable { ordered.set(0, shardRouting); } } + if (!initializingShards.isEmpty()) { + ordered.addAll(initializingShards); + } return new PlainShardIterator(shardId, ordered); } @@ -393,20 +413,16 @@ public class IndexShardRoutingTable implements Iterable { } private volatile Map activeShardsByAttributes = ImmutableMap.of(); + private volatile Map initializingShardsByAttributes = ImmutableMap.of(); private final Object shardsByAttributeMutex = new Object(); - public ShardIterator preferAttributesActiveShardsIt(String[] attributes, DiscoveryNodes nodes) { - return preferAttributesActiveShardsIt(attributes, nodes, counter.incrementAndGet()); - } - - public ShardIterator preferAttributesActiveShardsIt(String[] attributes, DiscoveryNodes nodes, int index) { - AttributesKey key = new AttributesKey(attributes); + private AttributesRoutings getActiveAttribute(AttributesKey key, DiscoveryNodes nodes) { AttributesRoutings shardRoutings = activeShardsByAttributes.get(key); if (shardRoutings == null) { synchronized (shardsByAttributeMutex) { ArrayList from = new ArrayList(activeShards); ArrayList to = new ArrayList(); - for (String attribute : attributes) { + for (String attribute : key.attributes) { String localAttributeValue = nodes.localNode().attributes().get(attribute); if (localAttributeValue == null) { continue; @@ -424,21 +440,53 @@ public class IndexShardRoutingTable implements Iterable { activeShardsByAttributes = MapBuilder.newMapBuilder(activeShardsByAttributes).put(key, shardRoutings).immutableMap(); } } + return shardRoutings; + } + + private AttributesRoutings getInitializingAttribute(AttributesKey key, DiscoveryNodes nodes) { + AttributesRoutings shardRoutings = initializingShardsByAttributes.get(key); + if (shardRoutings == null) { + synchronized (shardsByAttributeMutex) { + ArrayList from = new ArrayList(initializingShards); + ArrayList to = new ArrayList(); + for (String attribute : key.attributes) { + String localAttributeValue = nodes.localNode().attributes().get(attribute); + if (localAttributeValue == null) { + continue; + } + for (Iterator iterator = from.iterator(); iterator.hasNext(); ) { + ShardRouting fromShard = iterator.next(); + if (localAttributeValue.equals(nodes.get(fromShard.currentNodeId()).attributes().get(attribute))) { + iterator.remove(); + to.add(fromShard); + } + } + } + + shardRoutings = new AttributesRoutings(ImmutableList.copyOf(to), ImmutableList.copyOf(from)); + initializingShardsByAttributes = MapBuilder.newMapBuilder(initializingShardsByAttributes).put(key, shardRoutings).immutableMap(); + } + } + return shardRoutings; + } + + public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes) { + return preferAttributesActiveInitializingShardsIt(attributes, nodes, pickIndex()); + } + + public ShardIterator preferAttributesActiveInitializingShardsIt(String[] attributes, DiscoveryNodes nodes, int index) { + AttributesKey key = new AttributesKey(attributes); + AttributesRoutings activeRoutings = getActiveAttribute(key, nodes); + AttributesRoutings initializingRoutings = getInitializingAttribute(key, nodes); + // we now randomize, once between the ones that have the same attributes, and once for the ones that don't // we don't want to mix between the two! - ArrayList ordered = new ArrayList(shardRoutings.totalSize); + ArrayList ordered = new ArrayList(activeRoutings.totalSize + initializingRoutings.totalSize); index = Math.abs(index); - for (int i = 0; i < shardRoutings.withSameAttribute.size(); i++) { - int loc = (index + i) % shardRoutings.withSameAttribute.size(); - ShardRouting shardRouting = shardRoutings.withSameAttribute.get(loc); - ordered.add(shardRouting); - } - for (int i = 0; i < shardRoutings.withoutSameAttribute.size(); i++) { - int loc = (index + i) % shardRoutings.withoutSameAttribute.size(); - ShardRouting shardRouting = shardRoutings.withoutSameAttribute.get(loc); - ordered.add(shardRouting); - } - + addToListFromIndex(activeRoutings.withSameAttribute, ordered, index); + addToListFromIndex(activeRoutings.withoutSameAttribute, ordered, index); + addToListFromIndex(initializingRoutings.withSameAttribute, ordered, index); + addToListFromIndex(initializingRoutings.withoutSameAttribute, ordered, index); return new PlainShardIterator(shardId, ordered); } @@ -462,6 +510,23 @@ public class IndexShardRoutingTable implements Iterable { return shards; } + /** + * Adds from list to list, starting from the given index (wrapping around if needed). + */ + @SuppressWarnings("unchecked") + private void addToListFromIndex(List from, List to, int index) { + index = Math.abs(index); + for (int i = 0; i < from.size(); i++) { + int loc = (index + i) % from.size(); + to.add(from.get(loc)); + } + } + + // TODO: we can move to random based on ThreadLocalRandom, or make it pluggable + private int pickIndex() { + return Math.abs(counter.incrementAndGet()); + } + public static class Builder { private ShardId shardId; diff --git a/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java b/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java index b805de7020f..afc4dd38c13 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java +++ b/src/main/java/org/elasticsearch/cluster/routing/operation/plain/PlainOperationRouting.java @@ -41,7 +41,6 @@ import org.elasticsearch.index.IndexShardMissingException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndexMissingException; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Map; @@ -144,7 +143,7 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio return count; } } - + private static final Map> EMPTY_ROUTING = Collections.emptyMap(); @Override @@ -154,32 +153,32 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio } routing = routing == null ? EMPTY_ROUTING : routing; // just use an empty map final Set set = new HashSet(); - // we use set here and not list since we might get duplicates - for (String index : concreteIndices) { - final IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index); - final Set effectiveRouting = routing.get(index); - if (effectiveRouting != null) { - for (String r : effectiveRouting) { - int shardId = shardId(clusterState, index, null, null, r); - IndexShardRoutingTable indexShard = indexRouting.shard(shardId); - if (indexShard == null) { - throw new IndexShardMissingException(new ShardId(index, shardId)); - } - // we might get duplicates, but that's ok, they will override one another - ShardIterator iterator = preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference); - if (iterator != null) { - set.add(iterator); - } + // we use set here and not list since we might get duplicates + for (String index : concreteIndices) { + final IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index); + final Set effectiveRouting = routing.get(index); + if (effectiveRouting != null) { + for (String r : effectiveRouting) { + int shardId = shardId(clusterState, index, null, null, r); + IndexShardRoutingTable indexShard = indexRouting.shard(shardId); + if (indexShard == null) { + throw new IndexShardMissingException(new ShardId(index, shardId)); } - } else { - for (IndexShardRoutingTable indexShard : indexRouting) { - ShardIterator iterator = preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference); - if (iterator != null) { - set.add(iterator); - } + // we might get duplicates, but that's ok, they will override one another + ShardIterator iterator = preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference); + if (iterator != null) { + set.add(iterator); + } + } + } else { + for (IndexShardRoutingTable indexShard : indexRouting) { + ShardIterator iterator = preferenceActiveShardIterator(indexShard, clusterState.nodes().localNodeId(), clusterState.nodes(), preference); + if (iterator != null) { + set.add(iterator); } } } + } return new GroupShardsIterator(set); } @@ -187,9 +186,9 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio if (preference == null) { String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes(); if (awarenessAttributes.length == 0) { - return indexShard.activeShardsRandomIt(); + return indexShard.activeInitializingShardsRandomIt(); } else { - return indexShard.preferAttributesActiveShardsIt(awarenessAttributes, nodes); + return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); } } if (preference.charAt(0) == '_') { @@ -217,9 +216,9 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio if (index == -1 || index == preference.length() - 1) { String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes(); if (awarenessAttributes.length == 0) { - return indexShard.activeShardsRandomIt(); + return indexShard.activeInitializingShardsRandomIt(); } else { - return indexShard.preferAttributesActiveShardsIt(awarenessAttributes, nodes); + return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes); } } else { // update the preference and continue @@ -227,30 +226,30 @@ public class PlainOperationRouting extends AbstractComponent implements Operatio } } if (preference.startsWith("_prefer_node:")) { - return indexShard.preferNodeActiveShardsIt(preference.substring("_prefer_node:".length())); + return indexShard.preferNodeActiveInitializingShardsIt(preference.substring("_prefer_node:".length())); } if ("_local".equals(preference)) { - return indexShard.preferNodeActiveShardsIt(localNodeId); + return indexShard.preferNodeActiveInitializingShardsIt(localNodeId); } if ("_primary".equals(preference)) { - return indexShard.primaryActiveShardIt(); + return indexShard.primaryActiveInitializingShardIt(); } if ("_primary_first".equals(preference) || "_primaryFirst".equals(preference)) { - return indexShard.primaryFirstActiveShardsIt(); + return indexShard.primaryFirstActiveInitializingShardsIt(); } if ("_only_local".equals(preference) || "_onlyLocal".equals(preference)) { - return indexShard.onlyNodeActiveShardsIt(localNodeId); + return indexShard.onlyNodeActiveInitializingShardsIt(localNodeId); } if (preference.startsWith("_only_node:")) { - return indexShard.onlyNodeActiveShardsIt(preference.substring("_only_node:".length())); + return indexShard.onlyNodeActiveInitializingShardsIt(preference.substring("_only_node:".length())); } } // if not, then use it as the index String[] awarenessAttributes = awarenessAllocationDecider.awarenessAttributes(); if (awarenessAttributes.length == 0) { - return indexShard.activeShardsIt(DjbHashFunction.DJB_HASH(preference)); + return indexShard.activeInitializingShardsIt(DjbHashFunction.DJB_HASH(preference)); } else { - return indexShard.preferAttributesActiveShardsIt(awarenessAttributes, nodes, DjbHashFunction.DJB_HASH(preference)); + return indexShard.preferAttributesActiveInitializingShardsIt(awarenessAttributes, nodes, DjbHashFunction.DJB_HASH(preference)); } } diff --git a/src/test/java/org/elasticsearch/test/integration/indices/settings/UpdateNumberOfReplicasTests.java b/src/test/java/org/elasticsearch/test/integration/indices/settings/UpdateNumberOfReplicasTests.java index 549bc6dc7d6..48e5bdc76f5 100644 --- a/src/test/java/org/elasticsearch/test/integration/indices/settings/UpdateNumberOfReplicasTests.java +++ b/src/test/java/org/elasticsearch/test/integration/indices/settings/UpdateNumberOfReplicasTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.test.integration.indices.settings; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.count.CountResponse; @@ -38,23 +37,8 @@ import static org.hamcrest.Matchers.equalTo; */ public class UpdateNumberOfReplicasTests extends AbstractSharedClusterTest { - - /* - * Comment from Boaz on the dev@ list: - * - * A short update on the failing - * UpdateNumberOfReplicasTest.simpleUpdateNumberOfReplicasTests - Shay and I - * pinned down the source of the problem - it's caused by making searches - * based on dated knowledge of the cluster state and calling shards that - * have been relocating away in the mean time. - * - * I'll be working on a a fix (when searching on a shard that is in the - * process of relocating, fail over to the relocation target if the search - * to the relocation source failed), but will it take a couple of days to - * complete. - */ + @Test - @AwaitsFix(bugUrl = "Boaz is on it ;)") public void simpleUpdateNumberOfReplicasTests() throws Exception { logger.info("Creating index test"); prepareCreate("test", 2).execute().actionGet(); diff --git a/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java b/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java index 803f9f9b04d..a995c598951 100644 --- a/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java +++ b/src/test/java/org/elasticsearch/test/integration/search/query/SimpleQueryTests.java @@ -109,7 +109,7 @@ public class SimpleQueryTests extends AbstractSharedClusterTest { try { client().prepareSearch().setQuery(QueryBuilders.matchQuery("field1", "quick brown").type(MatchQueryBuilder.Type.PHRASE).slop(0)).get(); } catch (SearchPhaseExecutionException e) { - assertTrue(e.getMessage().endsWith("IllegalStateException[field \"field1\" was indexed without position data; cannot run PhraseQuery (term=quick)]; }")); + assertTrue("wrong exception message " + e.getMessage(), e.getMessage().endsWith("IllegalStateException[field \"field1\" was indexed without position data; cannot run PhraseQuery (term=quick)]; }")); } } @@ -118,7 +118,7 @@ public class SimpleQueryTests extends AbstractSharedClusterTest { client().admin().indices().prepareCreate("test") .addMapping("type1", "field1", "type=string,analyzer=whitespace") .setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).execute().actionGet(); - indexRandom("test", true, + indexRandom("test", true, client().prepareIndex("test", "type1", "3").setSource("field1", "quick lazy huge brown pidgin", "field2", "the quick lazy huge brown fox jumps over the tree"), client().prepareIndex("test", "type1", "1").setSource("field1", "the quick brown fox"), client().prepareIndex("test", "type1", "2").setSource("field1", "the quick lazy huge brown fox jumps over the tree") @@ -205,7 +205,7 @@ public class SimpleQueryTests extends AbstractSharedClusterTest { .addMapping("type1", "field1", "type=string,omit_term_freq_and_positions=true") .setSettings(ImmutableSettings.settingsBuilder().put("index.number_of_shards", 1)).get(); - indexRandom("test", true, + indexRandom("test", true, client().prepareIndex("test", "type1", "1").setSource("field1", "quick brown fox", "field2", "quick brown fox"), client().prepareIndex("test", "type1", "2").setSource("field1", "quick lazy huge brown fox", "field2", "quick lazy huge brown fox")); @@ -284,12 +284,12 @@ public class SimpleQueryTests extends AbstractSharedClusterTest { .startObject("_type").field("index", index).endObject() .endObject().endObject()) .execute().actionGet(); - indexRandom("test", true, - client().prepareIndex("test", "type1", "1").setSource("field1", "value1"), - client().prepareIndex("test", "type2", "1").setSource("field1", "value1"), - client().prepareIndex("test", "type1", "2").setSource("field1", "value1"), - client().prepareIndex("test", "type2", "2").setSource("field1", "value1"), - client().prepareIndex("test", "type2", "3").setSource("field1", "value1")); + indexRandom("test", true, + client().prepareIndex("test", "type1", "1").setSource("field1", "value1"), + client().prepareIndex("test", "type2", "1").setSource("field1", "value1"), + client().prepareIndex("test", "type1", "2").setSource("field1", "value1"), + client().prepareIndex("test", "type2", "2").setSource("field1", "value1"), + client().prepareIndex("test", "type2", "3").setSource("field1", "value1")); assertThat(client().prepareCount().setQuery(filteredQuery(matchAllQuery(), typeFilter("type1"))).execute().actionGet().getCount(), equalTo(2l)); assertThat(client().prepareCount().setQuery(filteredQuery(matchAllQuery(), typeFilter("type2"))).execute().actionGet().getCount(), equalTo(3l)); diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/structure/RoutingIteratorTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/structure/RoutingIteratorTests.java index 929c1e1d41f..a1b028e8e10 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/structure/RoutingIteratorTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/structure/RoutingIteratorTests.java @@ -282,7 +282,7 @@ public class RoutingIteratorTests { clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); // after all are started, check routing iteration - ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveShardsIt(new String[]{"rack_id"}, clusterState.nodes()); + ShardIterator shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes()); ShardRouting shardRouting = shardIterator.nextOrNull(); assertThat(shardRouting, notNullValue()); assertThat(shardRouting.currentNodeId(), equalTo("node1")); @@ -290,7 +290,7 @@ public class RoutingIteratorTests { assertThat(shardRouting, notNullValue()); assertThat(shardRouting.currentNodeId(), equalTo("node2")); - shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveShardsIt(new String[]{"rack_id"}, clusterState.nodes()); + shardIterator = clusterState.routingTable().index("test").shard(0).preferAttributesActiveInitializingShardsIt(new String[]{"rack_id"}, clusterState.nodes()); shardRouting = shardIterator.nextOrNull(); assertThat(shardRouting, notNullValue()); assertThat(shardRouting.currentNodeId(), equalTo("node1"));