TransportNodesListGatewayStartedShards should fall back to disk based index metadata if not found in cluster state (#17663)

When an index is recovered from disk it's metadata is imported first and the master reaches out to the nodes looking for shards of that index. Sometimes those requests reach other nodes before the cluster state is processed by them. At the moment, that situation disables the checking of the store, which requires the meta data (indices with custom path need to know where the data is). When corruption hits this means we may assign a shard to node with corrupted store, which will be caught later on but causes confusion. Instead we can try loading the meta data from disk in those cases.

Relates to #17630
This commit is contained in:
Boaz Leskes 2016-04-12 18:41:00 +02:00
parent 9567f154e6
commit 5af6982338
9 changed files with 216 additions and 140 deletions

View File

@ -405,7 +405,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]PrimaryShardAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReplicaShardAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]TransportNodesListGatewayMetaState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]TransportNodesListGatewayStartedShards.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]HttpTransportSettings.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]HttpRequestHandler.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyHttpChannel.java" checks="LineLength" />

View File

@ -29,7 +29,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
@ -110,7 +109,7 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
// we could fetch all shard store info from every node once (nNodes requests)
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
// for fetching shard stores info, that operates on a list of shards instead of a single shard
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, state.metaData(), shardIdsToFetch, listener).start();
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardIdsToFetch, listener).start();
}
@Override
@ -121,16 +120,14 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
private class AsyncShardStoresInfoFetches {
private final DiscoveryNodes nodes;
private final RoutingNodes routingNodes;
private final MetaData metaData;
private final Set<ShardId> shardIds;
private final ActionListener<IndicesShardStoresResponse> listener;
private CountDown expectedOps;
private final Queue<InternalAsyncFetch.Response> fetchResponses;
AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, MetaData metaData, Set<ShardId> shardIds, ActionListener<IndicesShardStoresResponse> listener) {
AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, Set<ShardId> shardIds, ActionListener<IndicesShardStoresResponse> listener) {
this.nodes = nodes;
this.routingNodes = routingNodes;
this.metaData = metaData;
this.shardIds = shardIds;
this.listener = listener;
this.fetchResponses = new ConcurrentLinkedQueue<>();
@ -143,7 +140,7 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
} else {
for (ShardId shardId : shardIds) {
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shardId, listShardStoresInfo);
fetch.fetchData(nodes, metaData, Collections.<String>emptySet());
fetch.fetchData(nodes, Collections.<String>emptySet());
}
}
}

View File

@ -25,8 +25,6 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -60,7 +58,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
* An action that lists the relevant shard data that needs to be fetched.
*/
public interface List<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, ActionListener<NodesResponse> listener);
void list(ShardId shardId, String[] nodesIds, ActionListener<NodesResponse> listener);
}
protected final ESLogger logger;
@ -104,7 +102,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
* The ignoreNodes are nodes that are supposed to be ignored for this round, since fetching is async, we need
* to keep them around and make sure we add them back when all the responses are fetched and returned.
*/
public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, MetaData metaData, Set<String> ignoreNodes) {
public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Set<String> ignoreNodes) {
if (closed) {
throw new IllegalStateException(shardId + ": can't fetch data on closed async fetch");
}
@ -121,7 +119,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
for (NodeEntry<T> nodeEntry : nodesToFetch) {
nodesIds[index++] = nodeEntry.getNodeId();
}
asyncFetch(shardId, nodesIds, metaData);
asyncFetch(shardId, nodesIds);
}
// if we are still fetching, return null to indicate it
@ -268,10 +266,9 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
* Async fetches data for the provided shard with the set of nodes that need to be fetched from.
*/
// visible for testing
void asyncFetch(final ShardId shardId, final String[] nodesIds, final MetaData metaData) {
IndexMetaData indexMetaData = metaData.getIndexSafe(shardId.getIndex());
void asyncFetch(final ShardId shardId, final String[] nodesIds) {
logger.trace("{} fetching [{}] from {}", shardId, type, nodesIds);
action.list(shardId, indexMetaData, nodesIds, new ActionListener<BaseNodesResponse<T>>() {
action.list(shardId, nodesIds, new ActionListener<BaseNodesResponse<T>>() {
@Override
public void onResponse(BaseNodesResponse<T> response) {
processAsyncFetch(shardId, response.getNodes(), response.failures());

View File

@ -153,7 +153,7 @@ public class GatewayAllocator extends AbstractComponent {
asyncFetchStarted.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState =
fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId()));
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
if (shardState.hasData() == true) {
shardState.processAllocation(allocation);
@ -179,7 +179,7 @@ public class GatewayAllocator extends AbstractComponent {
asyncFetchStore.put(shard.shardId(), fetch);
}
AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> shardStores =
fetch.fetchData(allocation.nodes(), allocation.metaData(), allocation.getIgnoreNodes(shard.shardId()));
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
if (shardStores.hasData() == true) {
shardStores.processAllocation(allocation);
}

View File

@ -57,24 +57,34 @@ import java.util.concurrent.atomic.AtomicReferenceArray;
* We use this to find out which node holds the latest shard version and which of them used to be a primary in order to allocate
* shards after node or cluster restarts.
*/
public class TransportNodesListGatewayStartedShards extends TransportNodesAction<TransportNodesListGatewayStartedShards.Request, TransportNodesListGatewayStartedShards.NodesGatewayStartedShards, TransportNodesListGatewayStartedShards.NodeRequest, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>
implements AsyncShardFetch.List<TransportNodesListGatewayStartedShards.NodesGatewayStartedShards, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> {
public class TransportNodesListGatewayStartedShards extends
TransportNodesAction<TransportNodesListGatewayStartedShards.Request,
TransportNodesListGatewayStartedShards.NodesGatewayStartedShards,
TransportNodesListGatewayStartedShards.NodeRequest,
TransportNodesListGatewayStartedShards.NodeGatewayStartedShards>
implements
AsyncShardFetch.List<TransportNodesListGatewayStartedShards.NodesGatewayStartedShards,
TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> {
public static final String ACTION_NAME = "internal:gateway/local/started_shards";
private final NodeEnvironment nodeEnv;
@Inject
public TransportNodesListGatewayStartedShards(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, NodeEnvironment env) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters, indexNameExpressionResolver,
Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED);
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
NodeEnvironment env) {
super(settings, ACTION_NAME, clusterName, threadPool, clusterService, transportService, actionFilters,
indexNameExpressionResolver, Request::new, NodeRequest::new, ThreadPool.Names.FETCH_SHARD_STARTED);
this.nodeEnv = env;
}
@Override
public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, ActionListener<NodesGatewayStartedShards> listener) {
execute(new Request(shardId, indexMetaData.getIndexUUID(), nodesIds), listener);
public void list(ShardId shardId, String[] nodesIds,
ActionListener<NodesGatewayStartedShards> listener) {
execute(new Request(shardId, nodesIds), listener);
}
@Override
@ -110,47 +120,58 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
} else if (resp instanceof FailedNodeException) {
failures.add((FailedNodeException) resp);
} else {
logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException", resp);
logger.warn("unknown response type [{}], expected NodeLocalGatewayStartedShards or FailedNodeException",
resp);
}
}
return new NodesGatewayStartedShards(clusterName, nodesList.toArray(new NodeGatewayStartedShards[nodesList.size()]),
failures.toArray(new FailedNodeException[failures.size()]));
return new NodesGatewayStartedShards(clusterName,
nodesList.toArray(new NodeGatewayStartedShards[nodesList.size()]),
failures.toArray(new FailedNodeException[failures.size()]));
}
@Override
protected NodeGatewayStartedShards nodeOperation(NodeRequest request) {
try {
final ShardId shardId = request.getShardId();
final String indexUUID = request.getIndexUUID();
logger.trace("{} loading local shard state info", shardId);
ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, nodeEnv.availableShardPaths(request.shardId));
ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger,
nodeEnv.availableShardPaths(request.shardId));
if (shardStateMetaData != null) {
final IndexMetaData metaData = clusterService.state().metaData().index(shardId.getIndex()); // it's a mystery why this is sometimes null
if (metaData != null) {
ShardPath shardPath = null;
try {
IndexSettings indexSettings = new IndexSettings(metaData, settings);
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");
}
Store.tryOpenIndex(shardPath.resolveIndex(), shardId);
} catch (Exception exception) {
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, allocationId, shardStateMetaData.primary, exception);
IndexMetaData metaData = clusterService.state().metaData().index(shardId.getIndex());
if (metaData == null) {
// we may send this requests while processing the cluster state that recovered the index
// sometimes the request comes in before the local node processed that cluster state
// in such cases we can load it from disk
metaData = IndexMetaData.FORMAT.loadLatestState(logger, nodeEnv.indexPaths(shardId.getIndex()));
}
if (metaData == null) {
ElasticsearchException e = new ElasticsearchException("failed to find local IndexMetaData");
e.setShard(request.shardId);
throw e;
}
ShardPath shardPath = null;
try {
IndexSettings indexSettings = new IndexSettings(metaData, settings);
shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings);
if (shardPath == null) {
throw new IllegalStateException(shardId + " no shard path found");
}
Store.tryOpenIndex(shardPath.resolveIndex(), shardId, logger);
} catch (Exception exception) {
logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId,
shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
String allocationId = shardStateMetaData.allocationId != null ?
shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
allocationId, shardStateMetaData.primary, exception);
}
// old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
// is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index.
if (indexUUID.equals(shardStateMetaData.indexUUID) == false
&& IndexMetaData.INDEX_UUID_NA_VALUE.equals(shardStateMetaData.indexUUID) == false) {
logger.warn("{} shard state info found but indexUUID didn't match expected [{}] actual [{}]", shardId, indexUUID, shardStateMetaData.indexUUID);
} else {
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion, allocationId, shardStateMetaData.primary);
}
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
String allocationId = shardStateMetaData.allocationId != null ?
shardStateMetaData.allocationId.getId() : null;
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.legacyVersion,
allocationId, shardStateMetaData.primary);
}
logger.trace("{} no local shard info found", shardId);
return new NodeGatewayStartedShards(clusterService.localNode(), ShardStateMetaData.NO_VERSION, null, false);
@ -167,15 +188,13 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
public static class Request extends BaseNodesRequest<Request> {
private ShardId shardId;
private String indexUUID;
public Request() {
}
public Request(ShardId shardId, String indexUUID, String[] nodesIds) {
public Request(ShardId shardId, String[] nodesIds) {
super(nodesIds);
this.shardId = shardId;
this.indexUUID = indexUUID;
}
@ -187,18 +206,12 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
indexUUID = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeString(indexUUID);
}
public String getIndexUUID() {
return indexUUID;
}
}
@ -206,7 +219,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
private FailedNodeException[] failures;
public NodesGatewayStartedShards(ClusterName clusterName, NodeGatewayStartedShards[] nodes, FailedNodeException[] failures) {
public NodesGatewayStartedShards(ClusterName clusterName, NodeGatewayStartedShards[] nodes,
FailedNodeException[] failures) {
super(clusterName, nodes);
this.failures = failures;
}
@ -240,38 +254,30 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
public static class NodeRequest extends BaseNodeRequest {
private ShardId shardId;
private String indexUUID;
public NodeRequest() {
}
NodeRequest(String nodeId, TransportNodesListGatewayStartedShards.Request request) {
NodeRequest(String nodeId, Request request) {
super(nodeId);
this.shardId = request.shardId();
this.indexUUID = request.getIndexUUID();
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardId = ShardId.readShardId(in);
indexUUID = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeString(indexUUID);
}
public ShardId getShardId() {
return shardId;
}
public String getIndexUUID() {
return indexUUID;
}
}
public static class NodeGatewayStartedShards extends BaseNodeResponse {
@ -283,11 +289,13 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
public NodeGatewayStartedShards() {
}
public NodeGatewayStartedShards(DiscoveryNode node, long legacyVersion, String allocationId, boolean primary) {
this(node, legacyVersion, allocationId, primary, null);
}
public NodeGatewayStartedShards(DiscoveryNode node, long legacyVersion, String allocationId, boolean primary, Throwable storeException) {
public NodeGatewayStartedShards(DiscoveryNode node, long legacyVersion, String allocationId, boolean primary,
Throwable storeException) {
super(node);
this.legacyVersion = legacyVersion;
this.allocationId = allocationId;
@ -338,14 +346,24 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
NodeGatewayStartedShards that = (NodeGatewayStartedShards) o;
if (legacyVersion != that.legacyVersion) return false;
if (primary != that.primary) return false;
if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) return false;
if (legacyVersion != that.legacyVersion) {
return false;
}
if (primary != that.primary) {
return false;
}
if (allocationId != null ? !allocationId.equals(that.allocationId) : that.allocationId != null) {
return false;
}
return storeException != null ? storeException.equals(that.storeException) : that.storeException == null;
}

View File

@ -422,7 +422,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
*/
public static boolean canOpenIndex(ESLogger logger, Path indexLocation, ShardId shardId) throws IOException {
try {
tryOpenIndex(indexLocation, shardId);
tryOpenIndex(indexLocation, shardId, logger);
} catch (Exception ex) {
logger.trace("Can't open index for path [{}]", ex, indexLocation);
return false;
@ -435,10 +435,11 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* segment infos and possible corruption markers. If the index can not
* be opened, an exception is thrown
*/
public static void tryOpenIndex(Path indexLocation, ShardId shardId) throws IOException {
public static void tryOpenIndex(Path indexLocation, ShardId shardId, ESLogger logger) throws IOException {
try (Directory dir = new SimpleFSDirectory(indexLocation)) {
failIfCorrupted(dir, shardId);
Lucene.readSegmentInfos(dir);
SegmentInfos segInfo = Lucene.readSegmentInfos(dir);
logger.trace("{} loaded segment info [{}]", shardId, segInfo);
}
}

View File

@ -84,7 +84,7 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
}
@Override
public void list(ShardId shardId, IndexMetaData indexMetaData, String[] nodesIds, ActionListener<NodesStoreFilesMetaData> listener) {
public void list(ShardId shardId, String[] nodesIds, ActionListener<NodesStoreFilesMetaData> listener) {
execute(new Request(shardId, false, nodesIds), listener);
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.gateway;
import org.elasticsearch.Version;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.logging.Loggers;
@ -75,7 +74,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node1.getId(), response1);
// first fetch, no data, still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@ -85,7 +84,7 @@ public class AsyncShardFetchTests extends ESTestCase {
assertThat(test.reroute.get(), equalTo(1));
test.close();
try {
test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
test.fetchData(nodes, emptySet());
fail("fetch data should fail when closed");
} catch (IllegalStateException e) {
// all is well
@ -97,7 +96,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node1.getId(), response1);
// first fetch, no data, still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@ -105,7 +104,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.fireSimulationAndWait(node1.getId());
// verify we get back the data node
assertThat(test.reroute.get(), equalTo(1));
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@ -117,7 +116,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node1.getId(), failure1);
// first fetch, no data, still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@ -125,19 +124,19 @@ public class AsyncShardFetchTests extends ESTestCase {
test.fireSimulationAndWait(node1.getId());
// failure, fetched data exists, but has no data
assertThat(test.reroute.get(), equalTo(1));
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(0));
// on failure, we reset the failure on a successive call to fetchData, and try again afterwards
test.addSimulation(node1.getId(), response1);
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
test.fireSimulationAndWait(node1.getId());
// 2 reroutes, cause we have a failure that we clear
assertThat(test.reroute.get(), equalTo(3));
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@ -149,7 +148,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node2.getId(), response2);
// no fetched data, 2 requests still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@ -157,14 +156,14 @@ public class AsyncShardFetchTests extends ESTestCase {
test.fireSimulationAndWait(node1.getId());
// there is still another on going request, so no data
assertThat(test.getNumberOfInFlightFetches(), equalTo(1));
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
// no more ongoing requests, we should fetch the data
assertThat(test.reroute.get(), equalTo(2));
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(2));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@ -177,21 +176,21 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node2.getId(), failure2);
// no fetched data, 2 requests still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
// fire the first response, it should trigger a reroute
test.fireSimulationAndWait(node1.getId());
assertThat(test.reroute.get(), equalTo(1));
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
assertThat(test.reroute.get(), equalTo(2));
// since one of those failed, we should only have one entry
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(1));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@ -202,7 +201,7 @@ public class AsyncShardFetchTests extends ESTestCase {
test.addSimulation(node1.getId(), response1);
// no fetched data, 2 requests still on going
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
AsyncShardFetch.FetchResult<Response> fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
assertThat(test.reroute.get(), equalTo(0));
@ -213,14 +212,14 @@ public class AsyncShardFetchTests extends ESTestCase {
nodes = DiscoveryNodes.builder(nodes).put(node2).build();
test.addSimulation(node2.getId(), response2);
// no fetch data, has a new node introduced
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(false));
// fire the second simulation, this should allow us to get the data
test.fireSimulationAndWait(node2.getId());
// since one of those failed, we should only have one entry
fetchData = test.fetchData(nodes, MetaData.EMPTY_META_DATA, emptySet());
fetchData = test.fetchData(nodes, emptySet());
assertThat(fetchData.hasData(), equalTo(true));
assertThat(fetchData.getData().size(), equalTo(2));
assertThat(fetchData.getData().get(node1), sameInstance(response1));
@ -270,7 +269,7 @@ public class AsyncShardFetchTests extends ESTestCase {
}
@Override
protected void asyncFetch(final ShardId shardId, String[] nodesIds, MetaData metaData) {
protected void asyncFetch(final ShardId shardId, String[] nodesIds) {
for (final String nodeId : nodesIds) {
threadPool.generic().execute(new Runnable() {
@Override

View File

@ -26,13 +26,19 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardPath;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
@ -42,6 +48,9 @@ import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.store.MockFSIndexStore;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@ -54,10 +63,12 @@ import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
@ClusterScope(numDataNodes = 0, scope = Scope.TEST)
public class RecoveryFromGatewayIT extends ESIntegTestCase {
@ -72,20 +83,20 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
internalCluster().startNode();
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
.startObject("properties").startObject("appAccountIds").field("type", "text").endObject().endObject()
.endObject().endObject().string();
.startObject("properties").startObject("appAccountIds").field("type", "text").endObject().endObject()
.endObject().endObject().string();
assertAcked(prepareCreate("test").addMapping("type1", mapping));
client().prepareIndex("test", "type1", "10990239").setSource(jsonBuilder().startObject()
.startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
.startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "10990473").setSource(jsonBuilder().startObject()
.startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
.startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "10990513").setSource(jsonBuilder().startObject()
.startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
.startArray("appAccountIds").value(14).value(179).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "10990695").setSource(jsonBuilder().startObject()
.startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
.startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
client().prepareIndex("test", "type1", "11026351").setSource(jsonBuilder().startObject()
.startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
.startArray("appAccountIds").value(14).endArray().endObject()).execute().actionGet();
refresh();
assertHitCount(client().prepareSearch().setSize(0).setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
@ -141,13 +152,13 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
internalCluster().startNode();
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
.startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer").endObject().endObject()
.endObject().endObject().string();
.startObject("properties").startObject("field").field("type", "text").endObject().startObject("num").field("type", "integer").endObject().endObject()
.endObject().endObject().string();
// note: default replica settings are tied to #data nodes-1 which is 0 here. We can do with 1 in this test.
int numberOfShards = numberOfShards();
assertAcked(prepareCreate("test").setSettings(
SETTING_NUMBER_OF_SHARDS, numberOfShards(),
SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1)
SETTING_NUMBER_OF_SHARDS, numberOfShards(),
SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1)
).addMapping("type1", mapping));
int value1Docs;
@ -170,12 +181,12 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
for (int id = 0; id < Math.max(value1Docs, value2Docs); id++) {
if (id < value1Docs) {
index("test", "type1", "1_" + id,
jsonBuilder().startObject().field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()
jsonBuilder().startObject().field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()
);
}
if (id < value2Docs) {
index("test", "type1", "2_" + id,
jsonBuilder().startObject().field("field", "value2").startArray("num").value(14).endArray().endObject()
jsonBuilder().startObject().field("field", "value2").startArray("num").value(14).endArray().endObject()
);
}
}
@ -341,16 +352,16 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
logger.info("--> add some metadata, additional type and template");
client.admin().indices().preparePutMapping("test").setType("type2")
.setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
.execute().actionGet();
.setSource(jsonBuilder().startObject().startObject("type2").endObject().endObject())
.execute().actionGet();
client.admin().indices().preparePutTemplate("template_1")
.setTemplate("te*")
.setOrder(0)
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
.startObject("field1").field("type", "text").field("store", true).endObject()
.startObject("field2").field("type", "keyword").field("store", true).endObject()
.endObject().endObject().endObject())
.execute().actionGet();
.setTemplate("te*")
.setOrder(0)
.addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("properties")
.startObject("field1").field("type", "text").field("store", true).endObject()
.startObject("field2").field("type", "keyword").field("store", true).endObject()
.endObject().endObject().endObject())
.execute().actionGet();
client.admin().indices().prepareAliases().addAlias("test", "test_alias", QueryBuilders.termQuery("field", "value")).execute().actionGet();
logger.info("--> starting two nodes back, verifying we got the latest version");
}
@ -378,19 +389,19 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
public void testReusePeerRecovery() throws Exception {
final Settings settings = Settings.builder()
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
.put("gateway.recover_after_nodes", 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4)
.put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build();
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
.put("gateway.recover_after_nodes", 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 4)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 4)
.put(MockFSDirectoryService.CRASH_INDEX_SETTING.getKey(), false).build();
internalCluster().startNodesAsync(4, settings).get();
// prevent any rebalance actions during the peer recovery
// if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
// we reuse the files on disk after full restarts for replicas.
assertAcked(prepareCreate("test").setSettings(Settings.builder()
.put(indexSettings())
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
.put(indexSettings())
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE)));
ensureGreen();
logger.info("--> indexing docs");
for (int i = 0; i < 1000; i++) {
@ -413,9 +424,9 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
// Disable allocations while we are closing nodes
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
.get();
.setTransientSettings(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
.get();
logger.info("--> full cluster restart");
internalCluster().fullRestart();
@ -430,9 +441,9 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
logger.info("--> disabling allocation while the cluster is shut down{}", useSyncIds ? "" : " a second time");
// Disable allocations while we are closing nodes
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
.get();
.setTransientSettings(Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
.get();
Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
@ -456,8 +467,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
}
if (!recoveryState.getPrimary() && (useSyncIds == false)) {
logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredBytes(), equalTo(recovered));
assertThat("data should have been reused", recoveryState.getIndex().reusedBytes(), greaterThan(0L));
// we have to recover the segments file since we commit the translog ID on engine startup
@ -468,8 +479,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
} else {
if (useSyncIds && !recoveryState.getPrimary()) {
logger.info("--> replica shard {} recovered from {} to {} using sync id, recovered {}, reuse {}",
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
recoveryState.getShardId().getId(), recoveryState.getSourceNode().getName(), recoveryState.getTargetNode().getName(),
recoveryState.getIndex().recoveredBytes(), recoveryState.getIndex().reusedBytes());
}
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
assertThat(recoveryState.getIndex().reusedBytes(), equalTo(recoveryState.getIndex().totalBytes()));
@ -514,4 +525,58 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1);
}
public void testStartedShardFoundIfStateNotYetProcessed() throws Exception {
// nodes may need to report the shards they processed the initial recovered cluster state from the master
final String nodeName = internalCluster().startNode();
assertAcked(prepareCreate("test").setSettings(SETTING_NUMBER_OF_SHARDS, 1));
final Index index = resolveIndex("test");
final ShardId shardId = new ShardId(index, 0);
index("test", "type", "1");
flush("test");
final boolean corrupt = randomBoolean();
internalCluster().fullRestart(new RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
// make sure state is not recovered
return Settings.builder().put(GatewayService.RECOVER_AFTER_NODES_SETTING.getKey(), 2).build();
}
});
if (corrupt) {
for (Path path : internalCluster().getInstance(NodeEnvironment.class, nodeName).availableShardPaths(shardId)) {
final Path indexPath = path.resolve(ShardPath.INDEX_FOLDER_NAME);
if (Files.exists(indexPath)) { // multi data path might only have one path in use
try (DirectoryStream<Path> stream = Files.newDirectoryStream(indexPath)) {
for (Path item : stream) {
if (item.getFileName().toString().startsWith("segments_")) {
logger.debug("--> deleting [{}]", item);
Files.delete(item);
}
}
}
}
}
}
DiscoveryNode node = internalCluster().getInstance(ClusterService.class, nodeName).localNode();
TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response;
response = internalCluster().getInstance(TransportNodesListGatewayStartedShards.class)
.execute(new TransportNodesListGatewayStartedShards.Request(shardId, new String[]{node.getId()}))
.get();
assertThat(response.getNodes(), arrayWithSize(1));
assertThat(response.getNodes()[0].allocationId(), notNullValue());
if (corrupt) {
assertThat(response.getNodes()[0].storeException(), notNullValue());
} else {
assertThat(response.getNodes()[0].storeException(), nullValue());
}
// start another node so cluster consistency checks won't time out due to the lack of state
internalCluster().startNode();
}
}