diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index fced27bd096..8d5c8bf2b26 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -405,7 +405,6 @@
-
diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java
index d6ea00c0ee1..5dbac12f694 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java
@@ -29,7 +29,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
-import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -110,7 +109,7 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
// we could fetch all shard store info from every node once (nNodes requests)
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
// for fetching shard stores info, that operates on a list of shards instead of a single shard
- new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, state.metaData(), shardIdsToFetch, listener).start();
+ new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardIdsToFetch, listener).start();
}
@Override
@@ -121,16 +120,14 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
private class AsyncShardStoresInfoFetches {
private final DiscoveryNodes nodes;
private final RoutingNodes routingNodes;
- private final MetaData metaData;
private final Set shardIds;
private final ActionListener listener;
private CountDown expectedOps;
private final Queue fetchResponses;
- AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, MetaData metaData, Set shardIds, ActionListener listener) {
+ AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, Set shardIds, ActionListener listener) {
this.nodes = nodes;
this.routingNodes = routingNodes;
- this.metaData = metaData;
this.shardIds = shardIds;
this.listener = listener;
this.fetchResponses = new ConcurrentLinkedQueue<>();
@@ -143,7 +140,7 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
} else {
for (ShardId shardId : shardIds) {
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shardId, listShardStoresInfo);
- fetch.fetchData(nodes, metaData, Collections.emptySet());
+ fetch.fetchData(nodes, Collections.emptySet());
}
}
}
diff --git a/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java
index 51718350253..cd6268d04ec 100644
--- a/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java
+++ b/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java
@@ -25,8 +25,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@@ -60,7 +58,7 @@ public abstract class AsyncShardFetch implements Rel
* An action that lists the relevant shard data that needs to be fetched.
*/
public interface List, NodeResponse extends BaseNodeResponse> {
- void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, ActionListener listener);
+ void list(ShardId shardId, String[] nodesIds, ActionListener listener);
}
protected final ESLogger logger;
@@ -104,7 +102,7 @@ public abstract class AsyncShardFetch implements Rel
* The ignoreNodes are nodes that are supposed to be ignored for this round, since fetching is async, we need
* to keep them around and make sure we add them back when all the responses are fetched and returned.
*/
- public synchronized FetchResult fetchData(DiscoveryNodes nodes, MetaData metaData, Set ignoreNodes) {
+ public synchronized FetchResult fetchData(DiscoveryNodes nodes, Set ignoreNodes) {
if (closed) {
throw new IllegalStateException(shardId + ": can't fetch data on closed async fetch");
}
@@ -121,7 +119,7 @@ public abstract class AsyncShardFetch implements Rel
for (NodeEntry nodeEntry : nodesToFetch) {
nodesIds[index++] = nodeEntry.getNodeId();
}
- asyncFetch(shardId, nodesIds, metaData);
+ asyncFetch(shardId, nodesIds);
}
// if we are still fetching, return null to indicate it
@@ -268,10 +266,9 @@ public abstract class AsyncShardFetch implements Rel
* Async fetches data for the provided shard with the set of nodes that need to be fetched from.
*/
// visible for testing
- void asyncFetch(final ShardId shardId, final String[] nodesIds, final MetaData metaData) {
- IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex());
+ void asyncFetch(final ShardId shardId, final String[] nodesIds) {
logger.trace("{} fetching [{}] from {}", shardId, type, nodesIds);
- action.list(shardId, indexMetaData, nodesIds, new ActionListener>() {
+ action.list(shardId, nodesIds, new ActionListener>() {
@Override
public void onResponse(BaseNodesResponse response) {
processAsyncFetch(shardId, response.getNodes(), response.failures());
diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
index 7553a4b47a3..e76e8085e86 100644
--- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
+++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
@@ -153,7 +153,7 @@ public class GatewayAllocator extends AbstractComponent {
asyncFetchStarted.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult shardState =
- fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId()));
+ fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
if (shardState.hasData() == true) {
shardState.processAllocation(allocation);
@@ -179,7 +179,7 @@ public class GatewayAllocator extends AbstractComponent {
asyncFetchStore.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult shardStores =
- fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId()));
+ fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
if (shardStores.hasData() == true) {
shardStores.processAllocation(allocation);
}
diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java
index 7a090208818..bdeb6d1660f 100644
--- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java
+++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java
@@ -57,24 +57,34 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
* We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate
* shards after node or cluster restarts.
*/
-public class TransportNodesListGatewayStartedShards extends TransportNodesAction
- implements AsyncShardFetch.List {
+public class TransportNodesListGatewayStartedShards extends
+ TransportNodesAction
+ implements
+ AsyncShardFetch.List {
public static final String ACTION_NAME = "internal:gateway/local/started_shards";
private final NodeEnvironment nodeEnv;
+
@Inject
public TransportNodesListGatewayStartedShards(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
- ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeEnvironment env) {
- super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
- Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED);
+ ActionFilters actionFilters,
+ IndexNameExpressionResolver indexNameExpressionResolver,
+ NodeEnvironment env) {
+ super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
+ indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED);
this.nodeEnv = env;
}
@Override
- public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, ActionListener listener) {
- execute(new Request(shardId, indexMetaData.getIndexUUID(), nodesIds), listener);
+ public void list(ShardId shardId, String[] nodesIds,
+ ActionListener listener) {
+ execute(new Request(shardId, nodesIds), listener);
}
@Override
@@ -110,47 +120,58 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
} else if (resp instanceof FailedNodeException) {
failures.add((FailedNodeException) resp);
} else {
- logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException", resp);
+ logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException",
+ resp);
}
}
- return new NodesGatewayStartedShards(clusterName, nodesList.toArray(new NodeGatewayStartedShards[nodesList.size()]),
- failures.toArray(new FailedNodeException[failures.size()]));
+ return new NodesGatewayStartedShards(clusterName,
+ nodesList.toArray(new NodeGatewayStartedShards[nodesList.size()]),
+ failures.toArray(new FailedNodeException[failures.size()]));
}
@Override
protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {
try {
final ShardId shardId = request.getShardId();
- final String indexUUID = request.getIndexUUID();
logger.trace("{} loading local shard state info", shardId);
- ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.availableShardPaths(request.shardId));
+ ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger,
+ nodeEnv.availableShardPaths(request.shardId));
if (shardStateMetaData != null) {
- final IndexMetaData metaData = clusterService.state().metaData().index(shardId.getIndex()); // it's a mystery why this is sometimes null
- if (metaData != null) {
- ShardPath shardPath = null;
- try {
- IndexSettings indexSettings = new IndexSettings(metaData, settings);
- shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
- if (shardPath == null) {
- throw new IllegalStateException(shardId + " no shard path found");
- }
- Store.tryOpenIndex(shardPath.resolveIndex(), shardId);
- } catch (Exception exception) {
- logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
- String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
- return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, allocationId, shardStateMetaData.primary, exception);
+ IndexMetaData metaData = clusterService.state().metaData().index(shardId.getIndex());
+ if (metaData == null) {
+ // we may send this requests while processing the cluster state that recovered the index
+ // sometimes the request comes in before the local node processed that cluster state
+ // in such cases we can load it from disk
+ metaData = IndexMetaData.FORMAT.loadLatestState(logger, nodeEnv.indexPaths(shardId.getIndex()));
+ }
+ if (metaData == null) {
+ ElasticsearchException e = new ElasticsearchException("failed to find local IndexMetaData");
+ e.setShard(request.shardId);
+ throw e;
+ }
+
+ ShardPath shardPath = null;
+ try {
+ IndexSettings indexSettings = new IndexSettings(metaData, settings);
+ shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
+ if (shardPath == null) {
+ throw new IllegalStateException(shardId + " no shard path found");
}
+ Store.tryOpenIndex(shardPath.resolveIndex(), shardId, logger);
+ } catch (Exception exception) {
+ logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId,
+ shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
+ String allocationId = shardStateMetaData.allocationId != null ?
+ shardStateMetaData.allocationId.getId() : null;
+ return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
+ allocationId, shardStateMetaData.primary, exception);
}
- // old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
- // is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index.
- if (indexUUID.equals(shardStateMetaData.indexUUID) == false
- && IndexMetaData.INDEX_UUID_NA_VALUE.equals(shardStateMetaData.indexUUID) == false) {
- logger.warn("{} shard state info found but indexUUID didn't match expected [{}] actual [{}]", shardId, indexUUID, shardStateMetaData.indexUUID);
- } else {
- logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
- String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
- return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, allocationId, shardStateMetaData.primary);
- }
+
+ logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
+ String allocationId = shardStateMetaData.allocationId != null ?
+ shardStateMetaData.allocationId.getId() : null;
+ return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
+ allocationId, shardStateMetaData.primary);
}
logger.trace("{} no local shard info found", shardId);
return new NodeGatewayStartedShards(clusterService.localNode(), ShardStateMetaData.NO_VERSION, null, false);
@@ -167,15 +188,13 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
public static class Request extends BaseNodesRequest {
private ShardId shardId;
- private String indexUUID;
public Request() {
}
- public Request(ShardId shardId, String indexUUID, String[] nodesIds) {
+ public Request(ShardId shardId, String[] nodesIds) {
super(nodesIds);
this.shardId = shardId;
- this.indexUUID = indexUUID;
}
@@ -187,18 +206,12 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
- indexUUID = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
- out.writeString(indexUUID);
- }
-
- public String getIndexUUID() {
- return indexUUID;
}
}
@@ -206,7 +219,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
private FailedNodeException[] failures;
- public NodesGatewayStartedShards(ClusterName clusterName, NodeGatewayStartedShards[] nodes, FailedNodeException[] failures) {
+ public NodesGatewayStartedShards(ClusterName clusterName, NodeGatewayStartedShards[] nodes,
+ FailedNodeException[] failures) {
super(clusterName, nodes);
this.failures = failures;
}
@@ -240,38 +254,30 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
public static class NodeRequest extends BaseNodeRequest {
private ShardId shardId;
- private String indexUUID;
public NodeRequest() {
}
- NodeRequest(String nodeId, TransportNodesListGatewayStartedShards.Request request) {
+ NodeRequest(String nodeId, Request request) {
super(nodeId);
this.shardId = request.shardId();
- this.indexUUID = request.getIndexUUID();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
- indexUUID = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
- out.writeString(indexUUID);
}
public ShardId getShardId() {
return shardId;
}
-
- public String getIndexUUID() {
- return indexUUID;
- }
}
public static class NodeGatewayStartedShards extends BaseNodeResponse {
@@ -283,11 +289,13 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
public NodeGatewayStartedShards() {
}
+
public NodeGatewayStartedShards(DiscoveryNode node, long legacyVersion, String allocationId, boolean primary) {
this(node, legacyVersion, allocationId, primary, null);
}
- public NodeGatewayStartedShards(DiscoveryNode node, long legacyVersion, String allocationId, boolean primary, Throwable storeException) {
+ public NodeGatewayStartedShards(DiscoveryNode node, long legacyVersion, String allocationId, boolean primary,
+ Throwable storeException) {
super(node);
this.legacyVersion = legacyVersion;
this.allocationId = allocationId;
@@ -338,14 +346,24 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
@Override
public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
NodeGatewayStartedShards that = (NodeGatewayStartedShards) o;
- if (legacyVersion != that.legacyVersion) return false;
- if (primary != that.primary) return false;
- if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) return false;
+ if (legacyVersion != that.legacyVersion) {
+ return false;
+ }
+ if (primary != that.primary) {
+ return false;
+ }
+ if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) {
+ return false;
+ }
return storeException != null ? storeException.equals(that.storeException) : that.storeException == null;
}
diff --git a/core/src/main/java/org/elasticsearch/index/store/Store.java b/core/src/main/java/org/elasticsearch/index/store/Store.java
index e0ed3bc98b7..39c4bcfb3c1 100644
--- a/core/src/main/java/org/elasticsearch/index/store/Store.java
+++ b/core/src/main/java/org/elasticsearch/index/store/Store.java
@@ -422,7 +422,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId shardId) throws IOException {
try {
- tryOpenIndex(indexLocation, shardId);
+ tryOpenIndex(indexLocation, shardId, logger);
} catch (Exception ex) {
logger.trace("Can't open index for path [{}]", ex, indexLocation);
return false;
@@ -435,10 +435,11 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* segment infos and possible corruption markers. If the index can not
* be opened, an exception is thrown
*/
- public static void tryOpenIndex(Path indexLocation, ShardId shardId) throws IOException {
+ public static void tryOpenIndex(Path indexLocation, ShardId shardId, ESLogger logger) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
- Lucene.readSegmentInfos(dir);
+ SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
+ logger.trace("{} loaded segment info [{}]", shardId, segInfo);
}
}
diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
index 35a34ebea1b..e4a1709db55 100644
--- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
+++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java
@@ -84,7 +84,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction listener) {
+ public void list(ShardId shardId, String[] nodesIds, ActionListener listener) {
execute(new Request(shardId, false, nodesIds), listener);
}
diff --git a/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java b/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java
index c67156b5034..3ab15baf2e0 100644
--- a/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java
+++ b/core/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java
@@ -21,7 +21,6 @@ package org.elasticsearch.gateway;
import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
-import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.Loggers;
@@ -75,7 +74,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node1.getId(), response1);
// first fetch, no data, still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -85,7 +84,7 @@ public class AsyncShardFetchTests extends ESTestCase {
assertThat(test.reroute.get(), equalTo(1));
test.close();
try {
- test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ test.fetchData(nodes, emptySet());
fail("fetch data should fail when closed");
} catch (IllegalStateException e) {
// all is well
@@ -97,7 +96,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node1.getId(), response1);
// first fetch, no data, still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -105,7 +104,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.fireSimulationAndWait(node1.getId());
// verify we get back the data node
assertThat(test.reroute.get(), equalTo(1));
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@@ -117,7 +116,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node1.getId(), failure1);
// first fetch, no data, still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -125,19 +124,19 @@ public class AsyncShardFetchTests extends ESTestCase {
test.fireSimulationAndWait(node1.getId());
// failure, fetched data exists, but has no data
assertThat(test.reroute.get(), equalTo(1));
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(0));
// on failure, we reset the failure on a successive call to fetchData, and try again afterwards
test.addSimulation(node1.getId(), response1);
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
test.fireSimulationAndWait(node1.getId());
// 2 reroutes, cause we have a failure that we clear
assertThat(test.reroute.get(), equalTo(3));
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@@ -149,7 +148,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node2.getId(), response2);
// no fetched data, 2 requests still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -157,14 +156,14 @@ public class AsyncShardFetchTests extends ESTestCase {
test.fireSimulationAndWait(node1.getId());
// there is still another on going request, so no data
assertThat(test.getNumberOfInFlightFetches(), equalTo(1));
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
// no more ongoing requests, we should fetch the data
assertThat(test.reroute.get(), equalTo(2));
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(2));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@@ -177,21 +176,21 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node2.getId(), failure2);
// no fetched data, 2 requests still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
// fire the first response, it should trigger a reroute
test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
assertThat(test.reroute.get(), equalTo(2));
// since one of those failed, we should only have one entry
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@@ -202,7 +201,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node1.getId(), response1);
// no fetched data, 2 requests still on going
- AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ AsyncShardFetch.FetchResult fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@@ -213,14 +212,14 @@ public class AsyncShardFetchTests extends ESTestCase {
nodes = DiscoveryNodes.builder(nodes).put(node2).build();
test.addSimulation(node2.getId(), response2);
// no fetch data, has a new node introduced
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
// since one of those failed, we should only have one entry
- fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
+ fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(2));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@@ -270,7 +269,7 @@ public class AsyncShardFetchTests extends ESTestCase {
}
@Override
- protected void asyncFetch(final ShardId shardId, String[] nodesIds, MetaData metaData) {
+ protected void asyncFetch(final ShardId shardId, String[] nodesIds) {
for (final String nodeId : nodesIds) {
threadPool.generic().execute(new Runnable() {
@Override
diff --git a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
index ae4c6f4f5c1..702e83e7d55 100644
--- a/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
+++ b/core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
@@ -26,13 +26,19 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
+import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@@ -42,6 +48,9 @@ import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSIndexStore;
+import java.nio.file.DirectoryStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -54,10 +63,12 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
@ClusterScope(numDataNodes = 0, scope = Scope.TEST)
public class RecoveryFromGatewayIT extends ESIntegTestCase {
@@ -72,20 +83,20 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
internalCluster().startNode();
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
- .startObject("properties").startObject("appAccountIds").field("type", "text").endObject().endObject()
- .endObject().endObject().string();
+ .startObject("properties").startObject("appAccountIds").field("type", "text").endObject().endObject()
+ .endObject().endObject().string();
assertAcked(prepareCreate("test").addMapping("type1", mapping));
client().prepareIndex("test", "type1", "10990239").setSource(jsonBuilder().startObject()
- .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
+ .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "10990473").setSource(jsonBuilder().startObject()
- .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
+ .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "10990513").setSource(jsonBuilder().startObject()
- .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
+ .startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "10990695").setSource(jsonBuilder().startObject()
- .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
+ .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "11026351").setSource(jsonBuilder().startObject()
- .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
+ .startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
refresh();
assertHitCount(client().prepareSearch().setSize(0).setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
@@ -141,13 +152,13 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
internalCluster().startNode();
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
- .startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer").endObject().endObject()
- .endObject().endObject().string();
+ .startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer").endObject().endObject()
+ .endObject().endObject().string();
// note: default replica settings are tied to #data nodes-1 which is 0 here. We can do with 1 in this test.
int numberOfShards = numberOfShards();
assertAcked(prepareCreate("test").setSettings(
- SETTING_NUMBER_OF_SHARDS, numberOfShards(),
- SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1)
+ SETTING_NUMBER_OF_SHARDS, numberOfShards(),
+ SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1)
).addMapping("type1", mapping));
int value1Docs;
@@ -170,12 +181,12 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
for (int id = 0; id < Math.max(value1Docs, value2Docs); id++) {
if (id < value1Docs) {
index("test", "type1", "1_" + id,
- jsonBuilder().startObject().field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()
+ jsonBuilder().startObject().field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()
);
}
if (id < value2Docs) {
index("test", "type1", "2_" + id,
- jsonBuilder().startObject().field("field", "value2").startArray("num").value(14).endArray().endObject()
+ jsonBuilder().startObject().field("field", "value2").startArray("num").value(14).endArray().endObject()
);
}
}
@@ -341,16 +352,16 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
logger.info("--> add some metadata, additional type and template");
client.admin().indices().preparePutMapping("test").setType("type2")
- .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
- .execute().actionGet();
+ .setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
+ .execute().actionGet();
client.admin().indices().preparePutTemplate("template_1")
- .setTemplate("te*")
- .setOrder(0)
- .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
- .startObject("field1").field("type", "text").field("store", true).endObject()
- .startObject("field2").field("type", "keyword").field("store", true).endObject()
- .endObject().endObject().endObject())
- .execute().actionGet();
+ .setTemplate("te*")
+ .setOrder(0)
+ .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
+ .startObject("field1").field("type", "text").field("store", true).endObject()
+ .startObject("field2").field("type", "keyword").field("store", true).endObject()
+ .endObject().endObject().endObject())
+ .execute().actionGet();
client.admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet();
logger.info("--> starting two nodes back, verifying we got the latest version");
}
@@ -378,19 +389,19 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
public void testReusePeerRecovery() throws Exception {
final Settings settings = Settings.builder()
- .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
- .put("gateway.recover_after_nodes", 4)
- .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 4)
- .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4)
- .put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build();
+ .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
+ .put("gateway.recover_after_nodes", 4)
+ .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 4)
+ .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4)
+ .put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build();
internalCluster().startNodesAsync(4, settings).get();
// prevent any rebalance actions during the peer recovery
// if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
// we reuse the files on disk after full restarts for replicas.
assertAcked(prepareCreate("test").setSettings(Settings.builder()
- .put(indexSettings())
- .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
+ .put(indexSettings())
+ .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
ensureGreen();
logger.info("--> indexing docs");
for (int i = 0; i < 1000; i++) {
@@ -413,9 +424,9 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
// Disable allocations while we are closing nodes
client().admin().cluster().prepareUpdateSettings()
- .setTransientSettings(Settings.builder()
- .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
- .get();
+ .setTransientSettings(Settings.builder()
+ .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
+ .get();
logger.info("--> full cluster restart");
internalCluster().fullRestart();
@@ -430,9 +441,9 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
logger.info("--> disabling allocation while the cluster is shut down{}", useSyncIds ? "" : " a second time");
// Disable allocations while we are closing nodes
client().admin().cluster().prepareUpdateSettings()
- .setTransientSettings(Settings.builder()
- .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
- .get();
+ .setTransientSettings(Settings.builder()
+ .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
+ .get();
Map primaryTerms = assertAndCapturePrimaryTerms(null);
@@ -456,8 +467,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
}
if (!recoveryState.getPrimary() && (useSyncIds == false)) {
logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
- recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
- recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
+ recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
+ recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L));
// we have to recover the segments file since we commit the translog ID on engine startup
@@ -468,8 +479,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
} else {
if (useSyncIds && !recoveryState.getPrimary()) {
logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}",
- recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
- recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
+ recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
+ recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
}
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
@@ -514,4 +525,58 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1);
}
+ public void testStartedShardFoundIfStateNotYetProcessed() throws Exception {
+ // nodes may need to report the shards they processed the initial recovered cluster state from the master
+ final String nodeName = internalCluster().startNode();
+ assertAcked(prepareCreate("test").setSettings(SETTING_NUMBER_OF_SHARDS, 1));
+ final Index index = resolveIndex("test");
+ final ShardId shardId = new ShardId(index, 0);
+ index("test", "type", "1");
+ flush("test");
+
+ final boolean corrupt = randomBoolean();
+
+ internalCluster().fullRestart(new RestartCallback() {
+ @Override
+ public Settings onNodeStopped(String nodeName) throws Exception {
+ // make sure state is not recovered
+ return Settings.builder().put(GatewayService.RECOVER_AFTER_NODES_SETTING.getKey(), 2).build();
+ }
+ });
+
+ if (corrupt) {
+ for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
+ final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
+ if (Files.exists(indexPath)) { // multi data path might only have one path in use
+ try (DirectoryStream stream = Files.newDirectoryStream(indexPath)) {
+ for (Path item : stream) {
+ if (item.getFileName().toString().startsWith("segments_")) {
+ logger.debug("--> deleting [{}]", item);
+ Files.delete(item);
+ }
+ }
+ }
+ }
+
+ }
+ }
+
+ DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode();
+
+ TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response;
+ response = internalCluster().getInstance(TransportNodesListGatewayStartedShards.class)
+ .execute(new TransportNodesListGatewayStartedShards.Request(shardId, new String[]{node.getId()}))
+ .get();
+
+ assertThat(response.getNodes(), arrayWithSize(1));
+ assertThat(response.getNodes()[0].allocationId(), notNullValue());
+ if (corrupt) {
+ assertThat(response.getNodes()[0].storeException(), notNullValue());
+ } else {
+ assertThat(response.getNodes()[0].storeException(), nullValue());
+ }
+
+ // start another node so cluster consistency checks won't time out due to the lack of state
+ internalCluster().startNode();
+ }
}