Revert "Removed index level metadata election #17233"
This reverts commit 1264ee79b6
.
This commit is contained in:
parent
14f45c1784
commit
533c967a2d
|
@ -19,6 +19,9 @@
|
||||||
|
|
||||||
package org.elasticsearch.gateway;
|
package org.elasticsearch.gateway;
|
||||||
|
|
||||||
|
import com.carrotsearch.hppc.ObjectFloatHashMap;
|
||||||
|
import com.carrotsearch.hppc.ObjectHashSet;
|
||||||
|
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||||
import org.apache.lucene.util.IOUtils;
|
import org.apache.lucene.util.IOUtils;
|
||||||
import org.elasticsearch.action.FailedNodeException;
|
import org.elasticsearch.action.FailedNodeException;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
|
@ -31,11 +34,15 @@ import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.discovery.Discovery;
|
import org.elasticsearch.discovery.Discovery;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.IndexService;
|
||||||
import org.elasticsearch.index.NodeServicesProvider;
|
import org.elasticsearch.index.NodeServicesProvider;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -84,6 +91,7 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ObjectFloatHashMap<Index> indices = new ObjectFloatHashMap<>();
|
||||||
MetaData electedGlobalState = null;
|
MetaData electedGlobalState = null;
|
||||||
int found = 0;
|
int found = 0;
|
||||||
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) {
|
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) {
|
||||||
|
@ -96,34 +104,65 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
|
||||||
} else if (nodeState.metaData().version() > electedGlobalState.version()) {
|
} else if (nodeState.metaData().version() > electedGlobalState.version()) {
|
||||||
electedGlobalState = nodeState.metaData();
|
electedGlobalState = nodeState.metaData();
|
||||||
}
|
}
|
||||||
|
for (ObjectCursor<IndexMetaData> cursor : nodeState.metaData().indices().values()) {
|
||||||
|
indices.addTo(cursor.value.getIndex(), 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (found < requiredAllocation) {
|
if (found < requiredAllocation) {
|
||||||
listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]");
|
listener.onFailure("found [" + found + "] metadata states, required [" + requiredAllocation + "]");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// verify index metadata
|
// update the global state, and clean the indices, we elect them in the next phase
|
||||||
MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState);
|
MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState).removeAllIndices();
|
||||||
for (IndexMetaData indexMetaData : electedGlobalState) {
|
|
||||||
try {
|
assert !indices.containsKey(null);
|
||||||
if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
|
final Object[] keys = indices.keys;
|
||||||
// verify that we can actually create this index - if not we recover it as closed with lots of warn logs
|
for (int i = 0; i < keys.length; i++) {
|
||||||
indicesService.verifyIndexMetadata(nodeServicesProvider, indexMetaData);
|
if (keys[i] != null) {
|
||||||
|
Index index = (Index) keys[i];
|
||||||
|
IndexMetaData electedIndexMetaData = null;
|
||||||
|
int indexMetaDataCount = 0;
|
||||||
|
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState) {
|
||||||
|
if (nodeState.metaData() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
IndexMetaData indexMetaData = nodeState.metaData().index(index);
|
||||||
|
if (indexMetaData == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (electedIndexMetaData == null) {
|
||||||
|
electedIndexMetaData = indexMetaData;
|
||||||
|
} else if (indexMetaData.getVersion() > electedIndexMetaData.getVersion()) {
|
||||||
|
electedIndexMetaData = indexMetaData;
|
||||||
|
}
|
||||||
|
indexMetaDataCount++;
|
||||||
|
}
|
||||||
|
if (electedIndexMetaData != null) {
|
||||||
|
if (indexMetaDataCount < requiredAllocation) {
|
||||||
|
logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetaDataCount, requiredAllocation);
|
||||||
|
} // TODO if this logging statement is correct then we are missing an else here
|
||||||
|
try {
|
||||||
|
if (electedIndexMetaData.getState() == IndexMetaData.State.OPEN) {
|
||||||
|
// verify that we can actually create this index - if not we recover it as closed with lots of warn logs
|
||||||
|
indicesService.verifyIndexMetadata(nodeServicesProvider, electedIndexMetaData);
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("recovering index {} failed - recovering as closed", e, electedIndexMetaData.getIndex());
|
||||||
|
electedIndexMetaData = IndexMetaData.builder(electedIndexMetaData).state(IndexMetaData.State.CLOSE).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
metaDataBuilder.put(electedIndexMetaData, false);
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
|
||||||
logger.warn("recovering index {} failed - recovering as closed", e, indexMetaData.getIndex());
|
|
||||||
indexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE).build();
|
|
||||||
metaDataBuilder.put(indexMetaData, true);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ClusterState.Builder builder = ClusterState.builder(clusterService.state().getClusterName());
|
ClusterState.Builder builder = ClusterState.builder(clusterService.state().getClusterName());
|
||||||
builder.metaData(metaDataBuilder);
|
builder.metaData(metaDataBuilder);
|
||||||
listener.onSuccess(builder.build());
|
listener.onSuccess(builder.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void reset() throws Exception {
|
public void reset() throws Exception {
|
||||||
try {
|
try {
|
||||||
Path[] dataPaths = nodeEnv.nodeDataPaths();
|
Path[] dataPaths = nodeEnv.nodeDataPaths();
|
||||||
logger.trace("removing node data paths: [{}]", (Object) dataPaths);
|
logger.trace("removing node data paths: [{}]", (Object)dataPaths);
|
||||||
IOUtils.rm(dataPaths);
|
IOUtils.rm(dataPaths);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
logger.debug("failed to delete shard locations", ex);
|
logger.debug("failed to delete shard locations", ex);
|
||||||
|
|
Loading…
Reference in New Issue