check if index is closed or was previously closed when gathering relevant indices to write meta state
When an index is opened it will not be assigned to a node but also not have closed state anymore. Before we only checked if an index either is closed or assigned to the data node and therefore the change from close->open was not written.
This commit is contained in:
parent
2713e903ab
commit
e44c5ff703
|
@ -28,7 +28,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
|
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.routing.*;
|
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||||
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
@ -98,6 +99,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clusterChanged(ClusterChangedEvent event) {
|
public void clusterChanged(ClusterChangedEvent event) {
|
||||||
|
|
||||||
Set<String> relevantIndices = new HashSet<>();
|
Set<String> relevantIndices = new HashSet<>();
|
||||||
final ClusterState state = event.state();
|
final ClusterState state = event.state();
|
||||||
if (state.blocks().disableStatePersistence()) {
|
if (state.blocks().disableStatePersistence()) {
|
||||||
|
@ -148,7 +150,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterable<IndexMetaWriteInfo> writeInfo;
|
Iterable<IndexMetaWriteInfo> writeInfo;
|
||||||
relevantIndices = getRelevantIndices(event.state(), previouslyWrittenIndices);
|
relevantIndices = getRelevantIndices(event.state(), event.previousState(), previouslyWrittenIndices);
|
||||||
writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData());
|
writeInfo = resolveStatesToBeWritten(previouslyWrittenIndices, relevantIndices, previousMetaData, event.state().metaData());
|
||||||
// check and write changes in indices
|
// check and write changes in indices
|
||||||
for (IndexMetaWriteInfo indexMetaWrite : writeInfo) {
|
for (IndexMetaWriteInfo indexMetaWrite : writeInfo) {
|
||||||
|
@ -169,10 +171,10 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Set<String> getRelevantIndices(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) {
|
public static Set<String> getRelevantIndices(ClusterState state, ClusterState previousState,ImmutableSet<String> previouslyWrittenIndices) {
|
||||||
Set<String> relevantIndices;
|
Set<String> relevantIndices;
|
||||||
if (isDataOnlyNode(state)) {
|
if (isDataOnlyNode(state)) {
|
||||||
relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previouslyWrittenIndices);
|
relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previousState, previouslyWrittenIndices);
|
||||||
} else if (state.nodes().localNode().masterNode() == true) {
|
} else if (state.nodes().localNode().masterNode() == true) {
|
||||||
relevantIndices = getRelevantIndicesForMasterEligibleNode(state);
|
relevantIndices = getRelevantIndicesForMasterEligibleNode(state);
|
||||||
} else {
|
} else {
|
||||||
|
@ -278,7 +280,7 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
return indicesToWrite;
|
return indicesToWrite;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Set<String> getRelevantIndicesOnDataOnlyNode(ClusterState state, ImmutableSet<String> previouslyWrittenIndices) {
|
public static Set<String> getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, ImmutableSet<String> previouslyWrittenIndices) {
|
||||||
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().localNodeId());
|
RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().localNodeId());
|
||||||
if (newRoutingNode == null) {
|
if (newRoutingNode == null) {
|
||||||
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
|
throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state");
|
||||||
|
@ -289,7 +291,14 @@ public class GatewayMetaState extends AbstractComponent implements ClusterStateL
|
||||||
}
|
}
|
||||||
// we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously
|
// we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if we have it written on disk previously
|
||||||
for (IndexMetaData indexMetaData : state.metaData()) {
|
for (IndexMetaData indexMetaData : state.metaData()) {
|
||||||
if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && state.metaData().getIndices().get(indexMetaData.getIndex()).state().equals(IndexMetaData.State.CLOSE)) {
|
boolean isOrWasClosed = indexMetaData.state().equals(IndexMetaData.State.CLOSE);
|
||||||
|
// if the index is open we might still have to write the state if it just transitioned from closed to open
|
||||||
|
// so we have to check for that as well.
|
||||||
|
IndexMetaData previousMetaData = previousState.metaData().getIndices().get(indexMetaData.getIndex());
|
||||||
|
if (previousMetaData != null) {
|
||||||
|
isOrWasClosed = isOrWasClosed || previousMetaData.state().equals(IndexMetaData.State.CLOSE);
|
||||||
|
}
|
||||||
|
if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && isOrWasClosed) {
|
||||||
indices.add(indexMetaData.getIndex());
|
indices.add(indexMetaData.getIndex());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -174,9 +174,9 @@ public class GatewayMetaStateTests extends ElasticsearchAllocationTestCase {
|
||||||
if (stateInMemory) {
|
if (stateInMemory) {
|
||||||
inMemoryMetaData = event.previousState().metaData();
|
inMemoryMetaData = event.previousState().metaData();
|
||||||
ImmutableSet.Builder<String> relevantIndices = ImmutableSet.builder();
|
ImmutableSet.Builder<String> relevantIndices = ImmutableSet.builder();
|
||||||
oldIndicesList = relevantIndices.addAll(GatewayMetaState.getRelevantIndices(event.previousState(), oldIndicesList)).build();
|
oldIndicesList = relevantIndices.addAll(GatewayMetaState.getRelevantIndices(event.previousState(), event.previousState(), oldIndicesList)).build();
|
||||||
}
|
}
|
||||||
Set<String> newIndicesList = GatewayMetaState.getRelevantIndices(event.state(), oldIndicesList);
|
Set<String> newIndicesList = GatewayMetaState.getRelevantIndices(event.state(),event.previousState(), oldIndicesList);
|
||||||
// third, get the actual write info
|
// third, get the actual write info
|
||||||
Iterator<GatewayMetaState.IndexMetaWriteInfo> indices = GatewayMetaState.resolveStatesToBeWritten(oldIndicesList, newIndicesList, inMemoryMetaData, event.state().metaData()).iterator();
|
Iterator<GatewayMetaState.IndexMetaWriteInfo> indices = GatewayMetaState.resolveStatesToBeWritten(oldIndicesList, newIndicesList, inMemoryMetaData, event.state().metaData()).iterator();
|
||||||
|
|
||||||
|
|
|
@ -124,14 +124,13 @@ public class MetaDataWriteDataNodesTests extends ElasticsearchIntegrationTest {
|
||||||
assertNotNull(((LinkedHashMap) (indicesMetaData.get(index).getMappings().get("doc").getSourceAsMap().get("properties"))).get("integer_field"));
|
assertNotNull(((LinkedHashMap) (indicesMetaData.get(index).getMappings().get("doc").getSourceAsMap().get("properties"))).get("integer_field"));
|
||||||
assertThat(indicesMetaData.get(index).state(), equalTo(IndexMetaData.State.CLOSE));
|
assertThat(indicesMetaData.get(index).state(), equalTo(IndexMetaData.State.CLOSE));
|
||||||
|
|
||||||
/**
|
/* Try the same and see if this also works if node was just restarted.
|
||||||
* Try the same and see if this also works if node was just restarted.
|
|
||||||
* Each node holds an array of indices it knows of and checks if it should
|
* Each node holds an array of indices it knows of and checks if it should
|
||||||
* write new meta data by looking up in this array. We need it because if an
|
* write new meta data by looking up in this array. We need it because if an
|
||||||
* index is closed it will not appear in the shard routing and we therefore
|
* index is closed it will not appear in the shard routing and we therefore
|
||||||
* need to keep track of what we wrote before. However, when the node is
|
* need to keep track of what we wrote before. However, when the node is
|
||||||
* restarted this array is empty and we have to fill it before we decide
|
* restarted this array is empty and we have to fill it before we decide
|
||||||
* what we write. This is why I explicitly test for it.
|
* what we write. This is why we explicitly test for it.
|
||||||
*/
|
*/
|
||||||
internalCluster().restartNode(dataNode, new RestartCallback());
|
internalCluster().restartNode(dataNode, new RestartCallback());
|
||||||
client().admin().indices().preparePutMapping(index).setType("doc").setSource(jsonBuilder().startObject()
|
client().admin().indices().preparePutMapping(index).setType("doc").setSource(jsonBuilder().startObject()
|
||||||
|
@ -151,19 +150,9 @@ public class MetaDataWriteDataNodesTests extends ElasticsearchIntegrationTest {
|
||||||
assertThat(indicesMetaData.get(index).state(), equalTo(IndexMetaData.State.CLOSE));
|
assertThat(indicesMetaData.get(index).state(), equalTo(IndexMetaData.State.CLOSE));
|
||||||
|
|
||||||
// finally check that meta data is also written of index opened again
|
// finally check that meta data is also written of index opened again
|
||||||
client().admin().indices().prepareOpen(index).get();
|
assertAcked(client().admin().indices().prepareOpen(index).get());
|
||||||
assertBusy(new Runnable() {
|
indicesMetaData = getIndicesMetaDataOnNode(dataNode);
|
||||||
@Override
|
assertThat(indicesMetaData.get(index).state(), equalTo(IndexMetaData.State.OPEN));
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
ImmutableOpenMap<String, IndexMetaData> indicesMetaData = getIndicesMetaDataOnNode(dataNode);
|
|
||||||
assertThat(indicesMetaData.get(index).state(), equalTo(IndexMetaData.State.OPEN));
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.info("caught exception while reading meta state: ", e);
|
|
||||||
fail();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void assertIndexNotInMetaState(String nodeName, String indexName) throws Exception {
|
protected void assertIndexNotInMetaState(String nodeName, String indexName) throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue