diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index c6e9182fd8f..f9433ee6059 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -29,6 +29,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; import org.elasticsearch.cluster.coordination.InMemoryPersistedState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -36,8 +37,6 @@ import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; @@ -49,124 +48,104 @@ import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; /** - * This class is responsible for storing/retrieving metadata to/from disk. - * When instance of this class is created, constructor ensures that this version is compatible with state stored on disk and performs - * state upgrade if necessary. Also it checks that atomic move is supported on the filesystem level, because it's a must for metadata - * store algorithm. - * Please note that the state being loaded when constructing the instance of this class is NOT the state that will be used as a - * {@link ClusterState#metaData()}. Instead when node is starting up, it calls {@link #getMetaData()} method and if this node is - * elected as master, it requests metaData from other master eligible nodes. After that, master node performs re-conciliation on the - * gathered results, re-creates {@link ClusterState} and broadcasts this state to other nodes in the cluster. + * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts. + * + * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that + * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link + * ClusterState#metaData()} because it might be stale or incomplete. Master-eligible nodes must perform an election to find a complete and + * non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster. */ -public class GatewayMetaState implements PersistedState { - protected static final Logger logger = LogManager.getLogger(GatewayMetaState.class); +public class GatewayMetaState { + private static final Logger logger = LogManager.getLogger(GatewayMetaState.class); - private final MetaStateService metaStateService; - private final Settings settings; - - // On master-eligible Zen2 nodes, we use this very object for the PersistedState (so that the state is actually persisted); on other - // nodes we use an InMemoryPersistedState instead and persist using a cluster applier if needed. In all cases it's an error to try and - // use this object as a PersistedState before calling start(). TODO stop implementing PersistedState at the top level. + // Set by calling start() private final SetOnce persistedState = new SetOnce<>(); - // on master-eligible nodes we call updateClusterState under the Coordinator's mutex; on master-ineligible data nodes we call - // updateClusterState on the (unique) cluster applier thread; on other nodes we never call updateClusterState. In all cases there's no - // need to synchronize access to these variables. - protected Manifest previousManifest; - protected ClusterState previousClusterState; - protected boolean incrementalWrite; - - public GatewayMetaState(Settings settings, MetaStateService metaStateService) { - this.settings = settings; - this.metaStateService = metaStateService; + public PersistedState getPersistedState() { + final PersistedState persistedState = this.persistedState.get(); + assert persistedState != null : "not started"; + return persistedState; } - public void start(TransportService transportService, ClusterService clusterService, - MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) { - assert previousClusterState == null : "should only start once, but already have " + previousClusterState; + public MetaData getMetaData() { + return getPersistedState().getLastAcceptedState().metaData(); + } + + public void start(Settings settings, TransportService transportService, ClusterService clusterService, + MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, + MetaDataUpgrader metaDataUpgrader) { + assert persistedState.get() == null : "should only start once, but already have " + persistedState.get(); + + final Tuple manifestClusterStateTuple; try { - upgradeMetaData(metaDataIndexUpgradeService, metaDataUpgrader); - initializeClusterState(ClusterName.CLUSTER_NAME_SETTING.get(settings)); + upgradeMetaData(settings, metaStateService, metaDataIndexUpgradeService, metaDataUpgrader); + manifestClusterStateTuple = loadStateAndManifest(ClusterName.CLUSTER_NAME_SETTING.get(settings), metaStateService); } catch (IOException e) { throw new ElasticsearchException("failed to load metadata", e); } - incrementalWrite = false; - - applyClusterStateUpdaters(transportService, clusterService); + final IncrementalClusterStateWriter incrementalClusterStateWriter + = new IncrementalClusterStateWriter(metaStateService, manifestClusterStateTuple.v1(), + prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2())); if (DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings).equals(DiscoveryModule.ZEN_DISCOVERY_TYPE)) { - // only for tests that simulate a mixed Zen1/Zen2 clusters, see Zen1IT - if (isMasterOrDataNode()) { - clusterService.addLowPriorityApplier(this::applyClusterState); + // only for tests that simulate mixed Zen1/Zen2 clusters, see Zen1IT + if (isMasterOrDataNode(settings)) { + clusterService.addLowPriorityApplier(new GatewayClusterApplier(incrementalClusterStateWriter)); } - persistedState.set(new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState())); + persistedState.set(new InMemoryPersistedState(manifestClusterStateTuple.v1().getCurrentTerm(), manifestClusterStateTuple.v2())); + } else if (DiscoveryNode.isMasterNode(settings) == false) { + if (DiscoveryNode.isDataNode(settings)) { + // Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's + // vitally important to the safety of the cluster coordination system that master-eligible nodes persist this metadata when + // _accepting_ the cluster state (i.e. before it is committed). This persistence happens on the generic threadpool. + // + // In contrast, master-ineligible data nodes only persist the index metadata for shards that they hold. When all shards of + // an index are moved off such a node the IndicesStore is responsible for removing the corresponding index directory, + // including the metadata, and does so on the cluster applier thread. + // + // This presents a problem: if a shard is unassigned from a node and then reassigned back to it again then there is a race + // between the IndicesStore deleting the index folder and the CoordinationState concurrently trying to write the updated + // metadata into it. We could probably solve this with careful synchronization, but in fact there is no need. The persisted + // state on master-ineligible data nodes is mostly ignored - it's only there to support dangling index imports, which is + // inherently unsafe anyway. Thus we can safely delay metadata writes on master-ineligible data nodes until applying the + // cluster state, which is what this does: + clusterService.addLowPriorityApplier(new GatewayClusterApplier(incrementalClusterStateWriter)); + } + + // Master-ineligible nodes do not need to persist the cluster state when accepting it because they are not in the voting + // configuration, so it's ok if they have a stale or incomplete cluster state when restarted. We track the latest cluster state + // in memory instead. + persistedState.set(new InMemoryPersistedState(manifestClusterStateTuple.v1().getCurrentTerm(), manifestClusterStateTuple.v2())); } else { - if (DiscoveryNode.isMasterNode(settings) == false) { - if (DiscoveryNode.isDataNode(settings)) { - // Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's - // vitally important to the safety of the cluster coordination system that master-eligible nodes persist this metadata - // when _accepting_ the cluster state (i.e. before it is committed). This persistence happens on the generic threadpool. - // - // In contrast, master-ineligible data nodes only persist the index metadata for shards that they hold. When all shards - // of an index are moved off such a node the IndicesStore is responsible for removing the corresponding index directory, - // including the metadata, and does so on the cluster applier thread. - // - // This presents a problem: if a shard is unassigned from a node and then reassigned back to it again then there is a - // race between the IndicesStore deleting the index folder and the CoordinationState concurrently trying to write the - // updated metadata into it. We could probably solve this with careful synchronization, but in fact there is no need. - // The persisted state on master-ineligible data nodes is mostly ignored - it's only there to support dangling index - // imports, which is inherently unsafe anyway. Thus we can safely delay metadata writes on master-ineligible data nodes - // until applying the cluster state, which is what this does: - clusterService.addLowPriorityApplier(this::applyClusterState); - } - persistedState.set(new InMemoryPersistedState(getCurrentTerm(), getLastAcceptedState())); - } else { - persistedState.set(this); - } + // Master-ineligible nodes must persist the cluster state when accepting it because they must reload the (complete, fresh) + // last-accepted cluster state when restarted. + persistedState.set(new GatewayPersistedState(incrementalClusterStateWriter)); } } - private void initializeClusterState(ClusterName clusterName) throws IOException { - long startNS = System.nanoTime(); - Tuple manifestAndMetaData = metaStateService.loadFullState(); - previousManifest = manifestAndMetaData.v1(); - - final MetaData metaData = manifestAndMetaData.v2(); - - previousClusterState = ClusterState.builder(clusterName) - .version(previousManifest.getClusterStateVersion()) - .metaData(metaData).build(); - - logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS))); - } - - protected void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) { - assert previousClusterState.nodes().getLocalNode() == null : "applyClusterStateUpdaters must only be called once"; + // exposed so it can be overridden by tests + ClusterState prepareInitialClusterState(TransportService transportService, ClusterService clusterService, ClusterState clusterState) { + assert clusterState.nodes().getLocalNode() == null : "prepareInitialClusterState must only be called once"; assert transportService.getLocalNode() != null : "transport service is not yet started"; - - previousClusterState = Function.identity() + return Function.identity() .andThen(ClusterStateUpdaters::addStateNotRecoveredBlock) .andThen(state -> ClusterStateUpdaters.setLocalNode(state, transportService.getLocalNode())) .andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings())) .andThen(ClusterStateUpdaters::recoverClusterBlocks) - .apply(previousClusterState); + .apply(clusterState); } - protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) - throws IOException { - if (isMasterOrDataNode()) { + // exposed so it can be overridden by tests + void upgradeMetaData(Settings settings, MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, + MetaDataUpgrader metaDataUpgrader) throws IOException { + if (isMasterOrDataNode(settings)) { try { final Tuple metaStateAndData = metaStateService.loadFullState(); final Manifest manifest = metaStateAndData.v1(); @@ -179,7 +158,8 @@ public class GatewayMetaState implements PersistedState { // if there is manifest file, it means metadata is properly persisted to all data paths // if there is no manifest file (upgrade from 6.x to 7.x) metadata might be missing on some data paths, // but anyway we will re-write it as soon as we receive first ClusterState - final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, manifest); + final IncrementalClusterStateWriter.AtomicClusterStateWriter writer + = new IncrementalClusterStateWriter.AtomicClusterStateWriter(metaStateService, manifest); final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader); final long globalStateGeneration; @@ -207,233 +187,25 @@ public class GatewayMetaState implements PersistedState { } } - private boolean isMasterOrDataNode() { + private static Tuple loadStateAndManifest(ClusterName clusterName, + MetaStateService metaStateService) throws IOException { + final long startNS = System.nanoTime(); + final Tuple manifestAndMetaData = metaStateService.loadFullState(); + final Manifest manifest = manifestAndMetaData.v1(); + + final ClusterState clusterState = ClusterState.builder(clusterName) + .version(manifest.getClusterStateVersion()) + .metaData(manifestAndMetaData.v2()).build(); + + logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS))); + + return Tuple.tuple(manifest, clusterState); + } + + private static boolean isMasterOrDataNode(Settings settings) { return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings); } - public PersistedState getPersistedState() { - final PersistedState persistedState = this.persistedState.get(); - assert persistedState != null : "not started"; - return persistedState; - } - - public MetaData getMetaData() { - return previousClusterState.metaData(); - } - - private void applyClusterState(ClusterChangedEvent event) { - assert isMasterOrDataNode(); - - if (event.state().blocks().disableStatePersistence()) { - incrementalWrite = false; - return; - } - - try { - // Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term - // that's higher than the last accepted term. - // TODO: can we get rid of this hack? - if (event.state().term() > getCurrentTerm()) { - innerSetCurrentTerm(event.state().term()); - } - - updateClusterState(event.state(), event.previousState()); - incrementalWrite = true; - } catch (WriteStateException e) { - logger.warn("Exception occurred when storing new meta data", e); - } - } - - @Override - public long getCurrentTerm() { - return previousManifest.getCurrentTerm(); - } - - @Override - public ClusterState getLastAcceptedState() { - assert previousClusterState.nodes().getLocalNode() != null : "Cluster state is not fully built yet"; - return previousClusterState; - } - - @Override - public void setCurrentTerm(long currentTerm) { - try { - innerSetCurrentTerm(currentTerm); - } catch (WriteStateException e) { - logger.error(new ParameterizedMessage("Failed to set current term to {}", currentTerm), e); - e.rethrowAsErrorOrUncheckedException(); - } - } - - private void innerSetCurrentTerm(long currentTerm) throws WriteStateException { - Manifest manifest = new Manifest(currentTerm, previousManifest.getClusterStateVersion(), previousManifest.getGlobalGeneration(), - new HashMap<>(previousManifest.getIndexGenerations())); - metaStateService.writeManifestAndCleanup("current term changed", manifest); - previousManifest = manifest; - } - - @Override - public void setLastAcceptedState(ClusterState clusterState) { - try { - incrementalWrite = previousClusterState.term() == clusterState.term(); - updateClusterState(clusterState, previousClusterState); - } catch (WriteStateException e) { - logger.error(new ParameterizedMessage("Failed to set last accepted state with version {}", clusterState.version()), e); - e.rethrowAsErrorOrUncheckedException(); - } - } - - /** - * This class is used to write changed global {@link MetaData}, {@link IndexMetaData} and {@link Manifest} to disk. - * This class delegates write* calls to corresponding write calls in {@link MetaStateService} and - * additionally it keeps track of cleanup actions to be performed if transaction succeeds or fails. - */ - static class AtomicClusterStateWriter { - private static final String FINISHED_MSG = "AtomicClusterStateWriter is finished"; - private final List commitCleanupActions; - private final List rollbackCleanupActions; - private final Manifest previousManifest; - private final MetaStateService metaStateService; - private boolean finished; - - AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) { - this.metaStateService = metaStateService; - assert previousManifest != null; - this.previousManifest = previousManifest; - this.commitCleanupActions = new ArrayList<>(); - this.rollbackCleanupActions = new ArrayList<>(); - this.finished = false; - } - - long writeGlobalState(String reason, MetaData metaData) throws WriteStateException { - assert finished == false : FINISHED_MSG; - try { - rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration())); - long generation = metaStateService.writeGlobalState(reason, metaData); - commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation)); - return generation; - } catch (WriteStateException e) { - rollback(); - throw e; - } - } - - long writeIndex(String reason, IndexMetaData metaData) throws WriteStateException { - assert finished == false : FINISHED_MSG; - try { - Index index = metaData.getIndex(); - Long previousGeneration = previousManifest.getIndexGenerations().get(index); - if (previousGeneration != null) { - // we prefer not to clean-up index metadata in case of rollback, - // if it's not referenced by previous manifest file - // not to break dangling indices functionality - rollbackCleanupActions.add(() -> metaStateService.cleanupIndex(index, previousGeneration)); - } - long generation = metaStateService.writeIndex(reason, metaData); - commitCleanupActions.add(() -> metaStateService.cleanupIndex(index, generation)); - return generation; - } catch (WriteStateException e) { - rollback(); - throw e; - } - } - - void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException { - assert finished == false : FINISHED_MSG; - try { - metaStateService.writeManifestAndCleanup(reason, manifest); - commitCleanupActions.forEach(Runnable::run); - finished = true; - } catch (WriteStateException e) { - // if Manifest write results in dirty WriteStateException it's not safe to remove - // new metadata files, because if Manifest was actually written to disk and its deletion - // fails it will reference these new metadata files. - // In the future, we might decide to add more fine grained check to understand if after - // WriteStateException Manifest deletion has actually failed. - if (e.isDirty() == false) { - rollback(); - } - throw e; - } - } - - void rollback() { - rollbackCleanupActions.forEach(Runnable::run); - finished = true; - } - } - - /** - * Updates manifest and meta data on disk. - * - * @param newState new {@link ClusterState} - * @param previousState previous {@link ClusterState} - * - * @throws WriteStateException if exception occurs. See also {@link WriteStateException#isDirty()}. - */ - private void updateClusterState(ClusterState newState, ClusterState previousState) - throws WriteStateException { - MetaData newMetaData = newState.metaData(); - - final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest); - long globalStateGeneration = writeGlobalState(writer, newMetaData); - Map indexGenerations = writeIndicesMetadata(writer, newState, previousState); - Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations); - writeManifest(writer, manifest); - - previousManifest = manifest; - previousClusterState = newState; - } - - private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException { - if (manifest.equals(previousManifest) == false) { - writer.writeManifestAndCleanup("changed", manifest); - } - } - - private Map writeIndicesMetadata(AtomicClusterStateWriter writer, ClusterState newState, ClusterState previousState) - throws WriteStateException { - Map previouslyWrittenIndices = previousManifest.getIndexGenerations(); - Set relevantIndices = getRelevantIndices(newState, previousState, previouslyWrittenIndices.keySet()); - - Map newIndices = new HashMap<>(); - - MetaData previousMetaData = incrementalWrite ? previousState.metaData() : null; - Iterable actions = resolveIndexMetaDataActions(previouslyWrittenIndices, relevantIndices, previousMetaData, - newState.metaData()); - - for (IndexMetaDataAction action : actions) { - long generation = action.execute(writer); - newIndices.put(action.getIndex(), generation); - } - - return newIndices; - } - - private long writeGlobalState(AtomicClusterStateWriter writer, MetaData newMetaData) - throws WriteStateException { - if (incrementalWrite == false || MetaData.isGlobalStateEquals(previousClusterState.metaData(), newMetaData) == false) { - return writer.writeGlobalState("changed", newMetaData); - } - return previousManifest.getGlobalGeneration(); - } - - public static Set getRelevantIndices(ClusterState state, ClusterState previousState, Set previouslyWrittenIndices) { - Set relevantIndices; - if (isDataOnlyNode(state)) { - relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previousState, previouslyWrittenIndices); - } else if (state.nodes().getLocalNode().isMasterNode()) { - relevantIndices = getRelevantIndicesForMasterEligibleNode(state); - } else { - relevantIndices = Collections.emptySet(); - } - return relevantIndices; - } - - private static boolean isDataOnlyNode(ClusterState state) { - return state.nodes().getLocalNode().isMasterNode() == false && state.nodes().getLocalNode().isDataNode(); - } - /** * Elasticsearch 2.0 removed several deprecated features and as well as support for Lucene 3.x. This method calls * {@link MetaDataIndexUpgradeService} to makes sure that indices are compatible with the current version. The @@ -489,160 +261,81 @@ public class GatewayMetaState implements PersistedState { return false; } - /** - * Returns list of {@link IndexMetaDataAction} for each relevant index. - * For each relevant index there are 3 options: - *
    - *
  1. - * {@link KeepPreviousGeneration} - index metadata is already stored to disk and index metadata version is not changed, no - * action is required. - *
  2. - *
  3. - * {@link WriteNewIndexMetaData} - there is no index metadata on disk and index metadata for this index should be written. - *
  4. - *
  5. - * {@link WriteChangedIndexMetaData} - index metadata is already on disk, but index metadata version has changed. Updated - * index metadata should be written to disk. - *
  6. - *
- * - * @param previouslyWrittenIndices A list of indices for which the state was already written before - * @param relevantIndices The list of indices for which state should potentially be written - * @param previousMetaData The last meta data we know of - * @param newMetaData The new metadata - * @return list of {@link IndexMetaDataAction} for each relevant index. - */ - public static List resolveIndexMetaDataActions(Map previouslyWrittenIndices, - Set relevantIndices, - MetaData previousMetaData, - MetaData newMetaData) { - List actions = new ArrayList<>(); - for (Index index : relevantIndices) { - IndexMetaData newIndexMetaData = newMetaData.getIndexSafe(index); - IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index); - if (previouslyWrittenIndices.containsKey(index) == false || previousIndexMetaData == null) { - actions.add(new WriteNewIndexMetaData(newIndexMetaData)); - } else if (previousIndexMetaData.getVersion() != newIndexMetaData.getVersion()) { - actions.add(new WriteChangedIndexMetaData(previousIndexMetaData, newIndexMetaData)); - } else { - actions.add(new KeepPreviousGeneration(index, previouslyWrittenIndices.get(index))); + private static class GatewayClusterApplier implements ClusterStateApplier { + + private final IncrementalClusterStateWriter incrementalClusterStateWriter; + + private GatewayClusterApplier(IncrementalClusterStateWriter incrementalClusterStateWriter) { + this.incrementalClusterStateWriter = incrementalClusterStateWriter; + } + + @Override + public void applyClusterState(ClusterChangedEvent event) { + if (event.state().blocks().disableStatePersistence()) { + incrementalClusterStateWriter.setIncrementalWrite(false); + return; + } + + try { + // Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term + // that's higher than the last accepted term. + // TODO: can we get rid of this hack? + if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) { + incrementalClusterStateWriter.setCurrentTerm(event.state().term()); + } + + incrementalClusterStateWriter.updateClusterState(event.state(), event.previousState()); + incrementalClusterStateWriter.setIncrementalWrite(true); + } catch (WriteStateException e) { + logger.warn("Exception occurred when storing new meta data", e); } } - return actions; + } - private static Set getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set - previouslyWrittenIndices) { - RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); - if (newRoutingNode == null) { - throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state"); + private static class GatewayPersistedState implements PersistedState { + + private final IncrementalClusterStateWriter incrementalClusterStateWriter; + + GatewayPersistedState(IncrementalClusterStateWriter incrementalClusterStateWriter) { + this.incrementalClusterStateWriter = incrementalClusterStateWriter; } - Set indices = new HashSet<>(); - for (ShardRouting routing : newRoutingNode) { - indices.add(routing.index()); + + @Override + public long getCurrentTerm() { + return incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm(); } - // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if - // we have it written on disk previously - for (IndexMetaData indexMetaData : state.metaData()) { - boolean isOrWasClosed = indexMetaData.getState().equals(IndexMetaData.State.CLOSE); - // if the index is open we might still have to write the state if it just transitioned from closed to open - // so we have to check for that as well. - IndexMetaData previousMetaData = previousState.metaData().index(indexMetaData.getIndex()); - if (previousMetaData != null) { - isOrWasClosed = isOrWasClosed || previousMetaData.getState().equals(IndexMetaData.State.CLOSE); - } - if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && isOrWasClosed) { - indices.add(indexMetaData.getIndex()); + + @Override + public ClusterState getLastAcceptedState() { + final ClusterState previousClusterState = incrementalClusterStateWriter.getPreviousClusterState(); + assert previousClusterState.nodes().getLocalNode() != null : "Cluster state is not fully built yet"; + return previousClusterState; + } + + @Override + public void setCurrentTerm(long currentTerm) { + try { + incrementalClusterStateWriter.setCurrentTerm(currentTerm); + } catch (WriteStateException e) { + logger.error(new ParameterizedMessage("Failed to set current term to {}", currentTerm), e); + e.rethrowAsErrorOrUncheckedException(); } } - return indices; - } - - private static Set getRelevantIndicesForMasterEligibleNode(ClusterState state) { - Set relevantIndices = new HashSet<>(); - // we have to iterate over the metadata to make sure we also capture closed indices - for (IndexMetaData indexMetaData : state.metaData()) { - relevantIndices.add(indexMetaData.getIndex()); - } - return relevantIndices; - } - - /** - * Action to perform with index metadata. - */ - public interface IndexMetaDataAction { - /** - * @return index for index metadata. - */ - Index getIndex(); - - /** - * Executes this action using provided {@link AtomicClusterStateWriter}. - * - * @return new index metadata state generation, to be used in manifest file. - * @throws WriteStateException if exception occurs. - */ - long execute(AtomicClusterStateWriter writer) throws WriteStateException; - } - - public static class KeepPreviousGeneration implements IndexMetaDataAction { - private final Index index; - private final long generation; - - KeepPreviousGeneration(Index index, long generation) { - this.index = index; - this.generation = generation; - } @Override - public Index getIndex() { - return index; + public void setLastAcceptedState(ClusterState clusterState) { + try { + final ClusterState previousClusterState = incrementalClusterStateWriter.getPreviousClusterState(); + incrementalClusterStateWriter.setIncrementalWrite(previousClusterState.term() == clusterState.term()); + incrementalClusterStateWriter.updateClusterState(clusterState, previousClusterState); + } catch (WriteStateException e) { + logger.error(new ParameterizedMessage("Failed to set last accepted state with version {}", clusterState.version()), e); + e.rethrowAsErrorOrUncheckedException(); + } } - @Override - public long execute(AtomicClusterStateWriter writer) { - return generation; - } } - public static class WriteNewIndexMetaData implements IndexMetaDataAction { - private final IndexMetaData indexMetaData; - - WriteNewIndexMetaData(IndexMetaData indexMetaData) { - this.indexMetaData = indexMetaData; - } - - @Override - public Index getIndex() { - return indexMetaData.getIndex(); - } - - @Override - public long execute(AtomicClusterStateWriter writer) throws WriteStateException { - return writer.writeIndex("freshly created", indexMetaData); - } - } - - public static class WriteChangedIndexMetaData implements IndexMetaDataAction { - private final IndexMetaData newIndexMetaData; - private final IndexMetaData oldIndexMetaData; - - WriteChangedIndexMetaData(IndexMetaData oldIndexMetaData, IndexMetaData newIndexMetaData) { - this.oldIndexMetaData = oldIndexMetaData; - this.newIndexMetaData = newIndexMetaData; - } - - @Override - public Index getIndex() { - return newIndexMetaData.getIndex(); - } - - @Override - public long execute(AtomicClusterStateWriter writer) throws WriteStateException { - return writer.writeIndex( - "version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]", - newIndexMetaData); - } - } } diff --git a/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java b/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java new file mode 100644 index 00000000000..5facb826a24 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/gateway/IncrementalClusterStateWriter.java @@ -0,0 +1,384 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.gateway; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.Manifest; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.index.Index; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Tracks the metadata written to disk, allowing updated metadata to be written incrementally (i.e. only writing out the changed metadata). + */ +class IncrementalClusterStateWriter { + + private final MetaStateService metaStateService; + + // On master-eligible nodes we call updateClusterState under the Coordinator's mutex; on master-ineligible data nodes we call + // updateClusterState on the (unique) cluster applier thread; on other nodes we never call updateClusterState. In all cases there's + // no need to synchronize access to these fields. + private Manifest previousManifest; + private ClusterState previousClusterState; + private boolean incrementalWrite; + + IncrementalClusterStateWriter(MetaStateService metaStateService, Manifest manifest, ClusterState clusterState) { + this.metaStateService = metaStateService; + this.previousManifest = manifest; + this.previousClusterState = clusterState; + this.incrementalWrite = false; + } + + void setCurrentTerm(long currentTerm) throws WriteStateException { + Manifest manifest = new Manifest(currentTerm, previousManifest.getClusterStateVersion(), previousManifest.getGlobalGeneration(), + new HashMap<>(previousManifest.getIndexGenerations())); + metaStateService.writeManifestAndCleanup("current term changed", manifest); + previousManifest = manifest; + } + + Manifest getPreviousManifest() { + return previousManifest; + } + + ClusterState getPreviousClusterState() { + return previousClusterState; + } + + void setIncrementalWrite(boolean incrementalWrite) { + this.incrementalWrite = incrementalWrite; + } + + /** + * Updates manifest and meta data on disk. + * + * @param newState new {@link ClusterState} + * @param previousState previous {@link ClusterState} + * + * @throws WriteStateException if exception occurs. See also {@link WriteStateException#isDirty()}. + */ + void updateClusterState(ClusterState newState, ClusterState previousState) throws WriteStateException { + MetaData newMetaData = newState.metaData(); + + final AtomicClusterStateWriter writer = new AtomicClusterStateWriter(metaStateService, previousManifest); + long globalStateGeneration = writeGlobalState(writer, newMetaData); + Map indexGenerations = writeIndicesMetadata(writer, newState, previousState); + Manifest manifest = new Manifest(previousManifest.getCurrentTerm(), newState.version(), globalStateGeneration, indexGenerations); + writeManifest(writer, manifest); + + previousManifest = manifest; + previousClusterState = newState; + } + + private void writeManifest(AtomicClusterStateWriter writer, Manifest manifest) throws WriteStateException { + if (manifest.equals(previousManifest) == false) { + writer.writeManifestAndCleanup("changed", manifest); + } + } + + private Map writeIndicesMetadata(AtomicClusterStateWriter writer, ClusterState newState, ClusterState previousState) + throws WriteStateException { + Map previouslyWrittenIndices = previousManifest.getIndexGenerations(); + Set relevantIndices = getRelevantIndices(newState, previousState, previouslyWrittenIndices.keySet()); + + Map newIndices = new HashMap<>(); + + MetaData previousMetaData = incrementalWrite ? previousState.metaData() : null; + Iterable actions = resolveIndexMetaDataActions(previouslyWrittenIndices, relevantIndices, previousMetaData, + newState.metaData()); + + for (IndexMetaDataAction action : actions) { + long generation = action.execute(writer); + newIndices.put(action.getIndex(), generation); + } + + return newIndices; + } + + private long writeGlobalState(AtomicClusterStateWriter writer, MetaData newMetaData) throws WriteStateException { + if (incrementalWrite == false || MetaData.isGlobalStateEquals(previousClusterState.metaData(), newMetaData) == false) { + return writer.writeGlobalState("changed", newMetaData); + } + return previousManifest.getGlobalGeneration(); + } + + + /** + * Returns list of {@link IndexMetaDataAction} for each relevant index. + * For each relevant index there are 3 options: + *
    + *
  1. + * {@link KeepPreviousGeneration} - index metadata is already stored to disk and index metadata version is not changed, no + * action is required. + *
  2. + *
  3. + * {@link WriteNewIndexMetaData} - there is no index metadata on disk and index metadata for this index should be written. + *
  4. + *
  5. + * {@link WriteChangedIndexMetaData} - index metadata is already on disk, but index metadata version has changed. Updated + * index metadata should be written to disk. + *
  6. + *
+ * + * @param previouslyWrittenIndices A list of indices for which the state was already written before + * @param relevantIndices The list of indices for which state should potentially be written + * @param previousMetaData The last meta data we know of + * @param newMetaData The new metadata + * @return list of {@link IndexMetaDataAction} for each relevant index. + */ + // exposed for tests + static List resolveIndexMetaDataActions(Map previouslyWrittenIndices, + Set relevantIndices, + MetaData previousMetaData, + MetaData newMetaData) { + List actions = new ArrayList<>(); + for (Index index : relevantIndices) { + IndexMetaData newIndexMetaData = newMetaData.getIndexSafe(index); + IndexMetaData previousIndexMetaData = previousMetaData == null ? null : previousMetaData.index(index); + + if (previouslyWrittenIndices.containsKey(index) == false || previousIndexMetaData == null) { + actions.add(new WriteNewIndexMetaData(newIndexMetaData)); + } else if (previousIndexMetaData.getVersion() != newIndexMetaData.getVersion()) { + actions.add(new WriteChangedIndexMetaData(previousIndexMetaData, newIndexMetaData)); + } else { + actions.add(new KeepPreviousGeneration(index, previouslyWrittenIndices.get(index))); + } + } + return actions; + } + + private static Set getRelevantIndicesOnDataOnlyNode(ClusterState state, ClusterState previousState, Set + previouslyWrittenIndices) { + RoutingNode newRoutingNode = state.getRoutingNodes().node(state.nodes().getLocalNodeId()); + if (newRoutingNode == null) { + throw new IllegalStateException("cluster state does not contain this node - cannot write index meta state"); + } + Set indices = new HashSet<>(); + for (ShardRouting routing : newRoutingNode) { + indices.add(routing.index()); + } + // we have to check the meta data also: closed indices will not appear in the routing table, but we must still write the state if + // we have it written on disk previously + for (IndexMetaData indexMetaData : state.metaData()) { + boolean isOrWasClosed = indexMetaData.getState().equals(IndexMetaData.State.CLOSE); + // if the index is open we might still have to write the state if it just transitioned from closed to open + // so we have to check for that as well. + IndexMetaData previousMetaData = previousState.metaData().index(indexMetaData.getIndex()); + if (previousMetaData != null) { + isOrWasClosed = isOrWasClosed || previousMetaData.getState().equals(IndexMetaData.State.CLOSE); + } + if (previouslyWrittenIndices.contains(indexMetaData.getIndex()) && isOrWasClosed) { + indices.add(indexMetaData.getIndex()); + } + } + return indices; + } + + private static Set getRelevantIndicesForMasterEligibleNode(ClusterState state) { + Set relevantIndices = new HashSet<>(); + // we have to iterate over the metadata to make sure we also capture closed indices + for (IndexMetaData indexMetaData : state.metaData()) { + relevantIndices.add(indexMetaData.getIndex()); + } + return relevantIndices; + } + + // exposed for tests + static Set getRelevantIndices(ClusterState state, ClusterState previousState, Set previouslyWrittenIndices) { + Set relevantIndices; + if (isDataOnlyNode(state)) { + relevantIndices = getRelevantIndicesOnDataOnlyNode(state, previousState, previouslyWrittenIndices); + } else if (state.nodes().getLocalNode().isMasterNode()) { + relevantIndices = getRelevantIndicesForMasterEligibleNode(state); + } else { + relevantIndices = Collections.emptySet(); + } + return relevantIndices; + } + + private static boolean isDataOnlyNode(ClusterState state) { + return state.nodes().getLocalNode().isMasterNode() == false && state.nodes().getLocalNode().isDataNode(); + } + + /** + * Action to perform with index metadata. + */ + interface IndexMetaDataAction { + /** + * @return index for index metadata. + */ + Index getIndex(); + + /** + * Executes this action using provided {@link AtomicClusterStateWriter}. + * + * @return new index metadata state generation, to be used in manifest file. + * @throws WriteStateException if exception occurs. + */ + long execute(AtomicClusterStateWriter writer) throws WriteStateException; + } + + /** + * This class is used to write changed global {@link MetaData}, {@link IndexMetaData} and {@link Manifest} to disk. + * This class delegates write* calls to corresponding write calls in {@link MetaStateService} and + * additionally it keeps track of cleanup actions to be performed if transaction succeeds or fails. + */ + static class AtomicClusterStateWriter { + private static final String FINISHED_MSG = "AtomicClusterStateWriter is finished"; + private final List commitCleanupActions; + private final List rollbackCleanupActions; + private final Manifest previousManifest; + private final MetaStateService metaStateService; + private boolean finished; + + AtomicClusterStateWriter(MetaStateService metaStateService, Manifest previousManifest) { + this.metaStateService = metaStateService; + assert previousManifest != null; + this.previousManifest = previousManifest; + this.commitCleanupActions = new ArrayList<>(); + this.rollbackCleanupActions = new ArrayList<>(); + this.finished = false; + } + + long writeGlobalState(String reason, MetaData metaData) throws WriteStateException { + assert finished == false : FINISHED_MSG; + try { + rollbackCleanupActions.add(() -> metaStateService.cleanupGlobalState(previousManifest.getGlobalGeneration())); + long generation = metaStateService.writeGlobalState(reason, metaData); + commitCleanupActions.add(() -> metaStateService.cleanupGlobalState(generation)); + return generation; + } catch (WriteStateException e) { + rollback(); + throw e; + } + } + + long writeIndex(String reason, IndexMetaData metaData) throws WriteStateException { + assert finished == false : FINISHED_MSG; + try { + Index index = metaData.getIndex(); + Long previousGeneration = previousManifest.getIndexGenerations().get(index); + if (previousGeneration != null) { + // we prefer not to clean-up index metadata in case of rollback, + // if it's not referenced by previous manifest file + // not to break dangling indices functionality + rollbackCleanupActions.add(() -> metaStateService.cleanupIndex(index, previousGeneration)); + } + long generation = metaStateService.writeIndex(reason, metaData); + commitCleanupActions.add(() -> metaStateService.cleanupIndex(index, generation)); + return generation; + } catch (WriteStateException e) { + rollback(); + throw e; + } + } + + void writeManifestAndCleanup(String reason, Manifest manifest) throws WriteStateException { + assert finished == false : FINISHED_MSG; + try { + metaStateService.writeManifestAndCleanup(reason, manifest); + commitCleanupActions.forEach(Runnable::run); + finished = true; + } catch (WriteStateException e) { + // If the Manifest write results in a dirty WriteStateException it's not safe to roll back, removing the new metadata files, + // because if the Manifest was actually written to disk and its deletion fails it will reference these new metadata files. + // On master-eligible nodes a dirty WriteStateException here is fatal to the node since we no longer really have any idea + // what the state on disk is and the only sensible response is to start again from scratch. + if (e.isDirty() == false) { + rollback(); + } + throw e; + } + } + + void rollback() { + rollbackCleanupActions.forEach(Runnable::run); + finished = true; + } + } + + static class KeepPreviousGeneration implements IndexMetaDataAction { + private final Index index; + private final long generation; + + KeepPreviousGeneration(Index index, long generation) { + this.index = index; + this.generation = generation; + } + + @Override + public Index getIndex() { + return index; + } + + @Override + public long execute(AtomicClusterStateWriter writer) { + return generation; + } + } + + static class WriteNewIndexMetaData implements IndexMetaDataAction { + private final IndexMetaData indexMetaData; + + WriteNewIndexMetaData(IndexMetaData indexMetaData) { + this.indexMetaData = indexMetaData; + } + + @Override + public Index getIndex() { + return indexMetaData.getIndex(); + } + + @Override + public long execute(AtomicClusterStateWriter writer) throws WriteStateException { + return writer.writeIndex("freshly created", indexMetaData); + } + } + + static class WriteChangedIndexMetaData implements IndexMetaDataAction { + private final IndexMetaData newIndexMetaData; + private final IndexMetaData oldIndexMetaData; + + WriteChangedIndexMetaData(IndexMetaData oldIndexMetaData, IndexMetaData newIndexMetaData) { + this.oldIndexMetaData = oldIndexMetaData; + this.newIndexMetaData = newIndexMetaData; + } + + @Override + public Index getIndex() { + return newIndexMetaData.getIndex(); + } + + @Override + public long execute(AtomicClusterStateWriter writer) throws WriteStateException { + return writer.writeIndex( + "version changed from [" + oldIndexMetaData.getVersion() + "] to [" + newIndexMetaData.getVersion() + "]", + newIndexMetaData); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index efa7ddcd657..feb35a91283 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -482,7 +482,7 @@ public class Node implements Closeable { ).collect(Collectors.toSet()); final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders); - final GatewayMetaState gatewayMetaState = new GatewayMetaState(settings, metaStateService); + final GatewayMetaState gatewayMetaState = new GatewayMetaState(); final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); final SearchTransportService searchTransportService = new SearchTransportService(transportService, SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); @@ -700,7 +700,7 @@ public class Node implements Closeable { // Load (and maybe upgrade) the metadata stored on disk final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class); - gatewayMetaState.start(transportService, clusterService, + gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class), injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class)); // we load the global state here (the persistent part of the cluster state stored on disk) to // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 107cc7541fe..e723d08d735 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -24,6 +24,8 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.coordination.CoordinationMetaData; import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion; +import org.elasticsearch.cluster.coordination.CoordinationState; +import org.elasticsearch.cluster.coordination.InMemoryPersistedState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; @@ -35,10 +37,10 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.test.ESTestCase; -import java.io.IOException; import java.util.Collections; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; public class GatewayMetaStatePersistedStateTests extends ESTestCase { @@ -63,21 +65,23 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { super.tearDown(); } - private MockGatewayMetaState newGateway() { - final MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode); - gateway.start(); - return gateway; + private CoordinationState.PersistedState newGatewayPersistedState() { + final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode); + gateway.start(settings, nodeEnvironment, xContentRegistry()); + final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); + assertThat(persistedState, not(instanceOf(InMemoryPersistedState.class))); + return persistedState; } - private MockGatewayMetaState maybeNew(MockGatewayMetaState gateway) throws IOException { + private CoordinationState.PersistedState maybeNew(CoordinationState.PersistedState persistedState) { if (randomBoolean()) { - return newGateway(); + return newGatewayPersistedState(); } - return gateway; + return persistedState; } - public void testInitialState() throws IOException { - MockGatewayMetaState gateway = newGateway(); + public void testInitialState() { + CoordinationState.PersistedState gateway = newGatewayPersistedState(); ClusterState state = gateway.getLastAcceptedState(); assertThat(state.getClusterName(), equalTo(clusterName)); assertTrue(MetaData.isGlobalStateEquals(state.metaData(), MetaData.EMPTY_META_DATA)); @@ -88,8 +92,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { assertThat(currentTerm, equalTo(Manifest.empty().getCurrentTerm())); } - public void testSetCurrentTerm() throws IOException { - MockGatewayMetaState gateway = newGateway(); + public void testSetCurrentTerm() { + CoordinationState.PersistedState gateway = newGatewayPersistedState(); for (int i = 0; i < randomIntBetween(1, 5); i++) { final long currentTerm = randomNonNegativeLong(); @@ -142,8 +146,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } } - public void testSetLastAcceptedState() throws IOException { - MockGatewayMetaState gateway = newGateway(); + public void testSetLastAcceptedState() { + CoordinationState.PersistedState gateway = newGatewayPersistedState(); final long term = randomNonNegativeLong(); for (int i = 0; i < randomIntBetween(1, 5); i++) { @@ -165,8 +169,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } } - public void testSetLastAcceptedStateTermChanged() throws IOException { - MockGatewayMetaState gateway = newGateway(); + public void testSetLastAcceptedStateTermChanged() { + CoordinationState.PersistedState gateway = newGatewayPersistedState(); final String indexName = randomAlphaOfLength(10); final int numberOfShards = randomIntBetween(1, 5); @@ -178,7 +182,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { gateway.setLastAcceptedState(state); gateway = maybeNew(gateway); - final long newTerm = randomValueOtherThan(term, () -> randomNonNegativeLong()); + final long newTerm = randomValueOtherThan(term, ESTestCase::randomNonNegativeLong); final int newNumberOfShards = randomValueOtherThan(numberOfShards, () -> randomIntBetween(1,5)); final IndexMetaData newIndexMetaData = createIndexMetaData(indexName, newNumberOfShards, version); final ClusterState newClusterState = createClusterState(randomNonNegativeLong(), @@ -189,11 +193,11 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { assertThat(gateway.getLastAcceptedState().metaData().index(indexName), equalTo(newIndexMetaData)); } - public void testCurrentTermAndTermAreDifferent() throws IOException { - MockGatewayMetaState gateway = newGateway(); + public void testCurrentTermAndTermAreDifferent() { + CoordinationState.PersistedState gateway = newGatewayPersistedState(); long currentTerm = randomNonNegativeLong(); - long term = randomValueOtherThan(currentTerm, () -> randomNonNegativeLong()); + long term = randomValueOtherThan(currentTerm, ESTestCase::randomNonNegativeLong); gateway.setCurrentTerm(currentTerm); gateway.setLastAcceptedState(createClusterState(randomNonNegativeLong(), @@ -204,8 +208,8 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { assertThat(gateway.getLastAcceptedState().coordinationMetaData().term(), equalTo(term)); } - public void testMarkAcceptedConfigAsCommitted() throws IOException { - MockGatewayMetaState gateway = newGateway(); + public void testMarkAcceptedConfigAsCommitted() { + CoordinationState.PersistedState gateway = newGatewayPersistedState(); //generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration CoordinationMetaData coordinationMetaData; diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java index c8f274c2f18..d0101f276d8 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStateTests.java @@ -19,417 +19,24 @@ package org.elasticsearch.gateway; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.MockDirectoryWrapper; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; -import org.elasticsearch.cluster.node.DiscoveryNodeRole; -import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.allocation.AllocationService; -import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; -import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.index.Index; import org.elasticsearch.plugins.MetaDataUpgrader; +import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestCustomMetaData; -import org.mockito.ArgumentCaptor; -import java.io.IOException; -import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; -import static org.mockito.Mockito.when; -public class GatewayMetaStateTests extends ESAllocationTestCase { - - private ClusterState noIndexClusterState(boolean masterEligible) { - MetaData metaData = MetaData.builder().build(); - RoutingTable routingTable = RoutingTable.builder().build(); - - return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metaData(metaData) - .routingTable(routingTable) - .nodes(generateDiscoveryNodes(masterEligible)) - .build(); - } - - private ClusterState clusterStateWithUnassignedIndex(IndexMetaData indexMetaData, boolean masterEligible) { - MetaData metaData = MetaData.builder() - .put(indexMetaData, false) - .build(); - - RoutingTable routingTable = RoutingTable.builder() - .addAsNew(metaData.index("test")) - .build(); - - return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metaData(metaData) - .routingTable(routingTable) - .nodes(generateDiscoveryNodes(masterEligible)) - .build(); - } - - private ClusterState clusterStateWithAssignedIndex(IndexMetaData indexMetaData, boolean masterEligible) { - AllocationService strategy = createAllocationService(Settings.builder() - .put("cluster.routing.allocation.node_concurrent_recoveries", 100) - .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") - .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) - .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) - .build()); - - ClusterState oldClusterState = clusterStateWithUnassignedIndex(indexMetaData, masterEligible); - RoutingTable routingTable = strategy.reroute(oldClusterState, "reroute").routingTable(); - - MetaData metaDataNewClusterState = MetaData.builder() - .put(oldClusterState.metaData().index("test"), false) - .build(); - - return ClusterState.builder(oldClusterState).routingTable(routingTable) - .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build(); - } - - private ClusterState clusterStateWithClosedIndex(IndexMetaData indexMetaData, boolean masterEligible) { - ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetaData, masterEligible); - - MetaData metaDataNewClusterState = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE) - .numberOfShards(5).numberOfReplicas(2)) - .version(oldClusterState.metaData().version() + 1) - .build(); - RoutingTable routingTable = RoutingTable.builder() - .addAsNew(metaDataNewClusterState.index("test")) - .build(); - - return ClusterState.builder(oldClusterState).routingTable(routingTable) - .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build(); - } - - private ClusterState clusterStateWithJustOpenedIndex(IndexMetaData indexMetaData, boolean masterEligible) { - ClusterState oldClusterState = clusterStateWithClosedIndex(indexMetaData, masterEligible); - - MetaData metaDataNewClusterState = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.OPEN) - .numberOfShards(5).numberOfReplicas(2)) - .version(oldClusterState.metaData().version() + 1) - .build(); - - return ClusterState.builder(oldClusterState) - .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build(); - } - - private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) { - Set dataOnlyRoles = Collections.singleton(DiscoveryNodeRole.DATA_ROLE); - return DiscoveryNodes.builder().add(newNode("node1", masterEligible ? MASTER_DATA_ROLES : dataOnlyRoles)) - .add(newNode("master_node", MASTER_DATA_ROLES)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node"); - } - - private Set randomPrevWrittenIndices(IndexMetaData indexMetaData) { - if (randomBoolean()) { - return Collections.singleton(indexMetaData.getIndex()); - } else { - return Collections.emptySet(); - } - } - - private IndexMetaData createIndexMetaData(String name) { - return IndexMetaData.builder(name). - settings(settings(Version.CURRENT)). - numberOfShards(5). - numberOfReplicas(2). - build(); - } - - public void testGetRelevantIndicesWithUnassignedShardsOnMasterEligibleNode() { - IndexMetaData indexMetaData = createIndexMetaData("test"); - Set indices = GatewayMetaState.getRelevantIndices( - clusterStateWithUnassignedIndex(indexMetaData, true), - noIndexClusterState(true), - randomPrevWrittenIndices(indexMetaData)); - assertThat(indices.size(), equalTo(1)); - } - - public void testGetRelevantIndicesWithUnassignedShardsOnDataOnlyNode() { - IndexMetaData indexMetaData = createIndexMetaData("test"); - Set indices = GatewayMetaState.getRelevantIndices( - clusterStateWithUnassignedIndex(indexMetaData, false), - noIndexClusterState(false), - randomPrevWrittenIndices(indexMetaData)); - assertThat(indices.size(), equalTo(0)); - } - - public void testGetRelevantIndicesWithAssignedShards() { - IndexMetaData indexMetaData = createIndexMetaData("test"); - boolean masterEligible = randomBoolean(); - Set indices = GatewayMetaState.getRelevantIndices( - clusterStateWithAssignedIndex(indexMetaData, masterEligible), - clusterStateWithUnassignedIndex(indexMetaData, masterEligible), - randomPrevWrittenIndices(indexMetaData)); - assertThat(indices.size(), equalTo(1)); - } - - public void testGetRelevantIndicesForClosedPrevWrittenIndexOnDataOnlyNode() { - IndexMetaData indexMetaData = createIndexMetaData("test"); - Set indices = GatewayMetaState.getRelevantIndices( - clusterStateWithClosedIndex(indexMetaData, false), - clusterStateWithAssignedIndex(indexMetaData, false), - Collections.singleton(indexMetaData.getIndex())); - assertThat(indices.size(), equalTo(1)); - } - - public void testGetRelevantIndicesForClosedPrevNotWrittenIndexOnDataOnlyNode() { - IndexMetaData indexMetaData = createIndexMetaData("test"); - Set indices = GatewayMetaState.getRelevantIndices( - clusterStateWithJustOpenedIndex(indexMetaData, false), - clusterStateWithClosedIndex(indexMetaData, false), - Collections.emptySet()); - assertThat(indices.size(), equalTo(0)); - } - - public void testGetRelevantIndicesForWasClosedPrevWrittenIndexOnDataOnlyNode() { - IndexMetaData indexMetaData = createIndexMetaData("test"); - Set indices = GatewayMetaState.getRelevantIndices( - clusterStateWithJustOpenedIndex(indexMetaData, false), - clusterStateWithClosedIndex(indexMetaData, false), - Collections.singleton(indexMetaData.getIndex())); - assertThat(indices.size(), equalTo(1)); - } - - public void testResolveStatesToBeWritten() throws WriteStateException { - Map indices = new HashMap<>(); - Set relevantIndices = new HashSet<>(); - - IndexMetaData removedIndex = createIndexMetaData("removed_index"); - indices.put(removedIndex.getIndex(), 1L); - - IndexMetaData versionChangedIndex = createIndexMetaData("version_changed_index"); - indices.put(versionChangedIndex.getIndex(), 2L); - relevantIndices.add(versionChangedIndex.getIndex()); - - IndexMetaData notChangedIndex = createIndexMetaData("not_changed_index"); - indices.put(notChangedIndex.getIndex(), 3L); - relevantIndices.add(notChangedIndex.getIndex()); - - IndexMetaData newIndex = createIndexMetaData("new_index"); - relevantIndices.add(newIndex.getIndex()); - - MetaData oldMetaData = MetaData.builder() - .put(removedIndex, false) - .put(versionChangedIndex, false) - .put(notChangedIndex, false) - .build(); - - MetaData newMetaData = MetaData.builder() - .put(versionChangedIndex, true) - .put(notChangedIndex, false) - .put(newIndex, false) - .build(); - - IndexMetaData newVersionChangedIndex = newMetaData.index(versionChangedIndex.getIndex()); - - List actions = - GatewayMetaState.resolveIndexMetaDataActions(indices, relevantIndices, oldMetaData, newMetaData); - - assertThat(actions, hasSize(3)); - - for (GatewayMetaState.IndexMetaDataAction action : actions) { - if (action instanceof GatewayMetaState.KeepPreviousGeneration) { - assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex())); - GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class); - assertThat(action.execute(writer), equalTo(3L)); - verifyZeroInteractions(writer); - } - if (action instanceof GatewayMetaState.WriteNewIndexMetaData) { - assertThat(action.getIndex(), equalTo(newIndex.getIndex())); - GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class); - when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L); - assertThat(action.execute(writer), equalTo(0L)); - } - if (action instanceof GatewayMetaState.WriteChangedIndexMetaData) { - assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex())); - GatewayMetaState.AtomicClusterStateWriter writer = mock(GatewayMetaState.AtomicClusterStateWriter.class); - when(writer.writeIndex(anyString(), eq(newVersionChangedIndex))).thenReturn(3L); - assertThat(action.execute(writer), equalTo(3L)); - ArgumentCaptor reason = ArgumentCaptor.forClass(String.class); - verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex)); - assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion()))); - assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion()))); - } - } - } - - private static class MetaStateServiceWithFailures extends MetaStateService { - private final int invertedFailRate; - private boolean failRandomly; - - private MetaDataStateFormat wrap(MetaDataStateFormat format) { - return new MetaDataStateFormat(format.getPrefix()) { - @Override - public void toXContent(XContentBuilder builder, T state) throws IOException { - format.toXContent(builder, state); - } - - @Override - public T fromXContent(XContentParser parser) throws IOException { - return format.fromXContent(parser); - } - - @Override - protected Directory newDirectory(Path dir) { - MockDirectoryWrapper mock = newMockFSDirectory(dir); - if (failRandomly) { - MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { - @Override - public void eval(MockDirectoryWrapper dir) throws IOException { - int r = randomIntBetween(0, invertedFailRate); - if (r == 0) { - throw new MockDirectoryWrapper.FakeIOException(); - } - } - }; - mock.failOn(fail); - } - closeAfterSuite(mock); - return mock; - } - }; - } - - MetaStateServiceWithFailures(int invertedFailRate, NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) { - super(nodeEnv, namedXContentRegistry); - META_DATA_FORMAT = wrap(MetaData.FORMAT); - INDEX_META_DATA_FORMAT = wrap(IndexMetaData.FORMAT); - MANIFEST_FORMAT = wrap(Manifest.FORMAT); - failRandomly = false; - this.invertedFailRate = invertedFailRate; - } - - void failRandomly() { - failRandomly = true; - } - - void noFailures() { - failRandomly = false; - } - } - - private boolean metaDataEquals(MetaData md1, MetaData md2) { - boolean equals = MetaData.isGlobalStateEquals(md1, md2); - - for (IndexMetaData imd : md1) { - IndexMetaData imd2 = md2.index(imd.getIndex()); - equals = equals && imd.equals(imd2); - } - - for (IndexMetaData imd : md2) { - IndexMetaData imd2 = md1.index(imd.getIndex()); - equals = equals && imd.equals(imd2); - } - return equals; - } - - private static MetaData randomMetaDataForTx() { - int settingNo = randomIntBetween(0, 10); - MetaData.Builder builder = MetaData.builder() - .persistentSettings(Settings.builder().put("setting" + settingNo, randomAlphaOfLength(5)).build()); - int numOfIndices = randomIntBetween(0, 3); - - for (int i = 0; i < numOfIndices; i++) { - int indexNo = randomIntBetween(0, 50); - IndexMetaData indexMetaData = IndexMetaData.builder("index" + indexNo).settings( - Settings.builder() - .put(IndexMetaData.SETTING_INDEX_UUID, "index" + indexNo) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .build() - ).build(); - builder.put(indexMetaData, false); - } - return builder.build(); - } - - public void testAtomicityWithFailures() throws IOException { - try (NodeEnvironment env = newNodeEnvironment()) { - MetaStateServiceWithFailures metaStateService = - new MetaStateServiceWithFailures(randomIntBetween(100, 1000), env, xContentRegistry()); - - // We only guarantee atomicity of writes, if there is initial Manifest file - Manifest manifest = Manifest.empty(); - MetaData metaData = MetaData.EMPTY_META_DATA; - metaStateService.writeManifestAndCleanup("startup", Manifest.empty()); - long currentTerm = randomNonNegativeLong(); - long clusterStateVersion = randomNonNegativeLong(); - - metaStateService.failRandomly(); - Set possibleMetaData = new HashSet<>(); - possibleMetaData.add(metaData); - - for (int i = 0; i < randomIntBetween(1, 5); i++) { - GatewayMetaState.AtomicClusterStateWriter writer = - new GatewayMetaState.AtomicClusterStateWriter(metaStateService, manifest); - metaData = randomMetaDataForTx(); - Map indexGenerations = new HashMap<>(); - - try { - long globalGeneration = writer.writeGlobalState("global", metaData); - - for (IndexMetaData indexMetaData : metaData) { - long generation = writer.writeIndex("index", indexMetaData); - indexGenerations.put(indexMetaData.getIndex(), generation); - } - - Manifest newManifest = new Manifest(currentTerm, clusterStateVersion, globalGeneration, indexGenerations); - writer.writeManifestAndCleanup("manifest", newManifest); - possibleMetaData.clear(); - possibleMetaData.add(metaData); - manifest = newManifest; - } catch (WriteStateException e) { - if (e.isDirty()) { - possibleMetaData.add(metaData); - /* - * If dirty WriteStateException occurred, it's only safe to proceed if there is subsequent - * successful write of metadata and Manifest. We prefer to break here, not to over complicate test logic. - * See also MetaDataStateFormat#testFailRandomlyAndReadAnyState, that does not break. - */ - break; - } - } - } - - metaStateService.noFailures(); - - Tuple manifestAndMetaData = metaStateService.loadFullState(); - MetaData loadedMetaData = manifestAndMetaData.v2(); - - assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData))); - } - } +public class GatewayMetaStateTests extends ESTestCase { public void testAddCustomMetaDataOnUpgrade() throws Exception { MetaData metaData = randomMetaData(); diff --git a/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java b/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java new file mode 100644 index 00000000000..b41a24bb820 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/gateway/IncrementalClusterStateWriterTests.java @@ -0,0 +1,429 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.gateway; + +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.MockDirectoryWrapper; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ESAllocationTestCase; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.Manifest; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.elasticsearch.common.collect.Tuple; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; +import org.mockito.ArgumentCaptor; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +public class IncrementalClusterStateWriterTests extends ESAllocationTestCase { + + private ClusterState noIndexClusterState(boolean masterEligible) { + MetaData metaData = MetaData.builder().build(); + RoutingTable routingTable = RoutingTable.builder().build(); + + return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData) + .routingTable(routingTable) + .nodes(generateDiscoveryNodes(masterEligible)) + .build(); + } + + private ClusterState clusterStateWithUnassignedIndex(IndexMetaData indexMetaData, boolean masterEligible) { + MetaData metaData = MetaData.builder() + .put(indexMetaData, false) + .build(); + + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaData.index("test")) + .build(); + + return ClusterState.builder(org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metaData(metaData) + .routingTable(routingTable) + .nodes(generateDiscoveryNodes(masterEligible)) + .build(); + } + + private ClusterState clusterStateWithAssignedIndex(IndexMetaData indexMetaData, boolean masterEligible) { + AllocationService strategy = createAllocationService(Settings.builder() + .put("cluster.routing.allocation.node_concurrent_recoveries", 100) + .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") + .put("cluster.routing.allocation.cluster_concurrent_rebalance", 100) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 100) + .build()); + + ClusterState oldClusterState = clusterStateWithUnassignedIndex(indexMetaData, masterEligible); + RoutingTable routingTable = strategy.reroute(oldClusterState, "reroute").routingTable(); + + MetaData metaDataNewClusterState = MetaData.builder() + .put(oldClusterState.metaData().index("test"), false) + .build(); + + return ClusterState.builder(oldClusterState).routingTable(routingTable) + .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build(); + } + + private ClusterState clusterStateWithClosedIndex(IndexMetaData indexMetaData, boolean masterEligible) { + ClusterState oldClusterState = clusterStateWithAssignedIndex(indexMetaData, masterEligible); + + MetaData metaDataNewClusterState = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.CLOSE) + .numberOfShards(5).numberOfReplicas(2)) + .version(oldClusterState.metaData().version() + 1) + .build(); + RoutingTable routingTable = RoutingTable.builder() + .addAsNew(metaDataNewClusterState.index("test")) + .build(); + + return ClusterState.builder(oldClusterState).routingTable(routingTable) + .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build(); + } + + private ClusterState clusterStateWithJustOpenedIndex(IndexMetaData indexMetaData, boolean masterEligible) { + ClusterState oldClusterState = clusterStateWithClosedIndex(indexMetaData, masterEligible); + + MetaData metaDataNewClusterState = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).state(IndexMetaData.State.OPEN) + .numberOfShards(5).numberOfReplicas(2)) + .version(oldClusterState.metaData().version() + 1) + .build(); + + return ClusterState.builder(oldClusterState) + .metaData(metaDataNewClusterState).version(oldClusterState.getVersion() + 1).build(); + } + + private DiscoveryNodes.Builder generateDiscoveryNodes(boolean masterEligible) { + Set dataOnlyRoles = Collections.singleton(DiscoveryNodeRole.DATA_ROLE); + return DiscoveryNodes.builder().add(newNode("node1", masterEligible ? MASTER_DATA_ROLES : dataOnlyRoles)) + .add(newNode("master_node", MASTER_DATA_ROLES)).localNodeId("node1").masterNodeId(masterEligible ? "node1" : "master_node"); + } + + private Set randomPrevWrittenIndices(IndexMetaData indexMetaData) { + if (randomBoolean()) { + return Collections.singleton(indexMetaData.getIndex()); + } else { + return Collections.emptySet(); + } + } + + private IndexMetaData createIndexMetaData(String name) { + return IndexMetaData.builder(name). + settings(settings(Version.CURRENT)). + numberOfShards(5). + numberOfReplicas(2). + build(); + } + + public void testGetRelevantIndicesWithUnassignedShardsOnMasterEligibleNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = IncrementalClusterStateWriter.getRelevantIndices( + clusterStateWithUnassignedIndex(indexMetaData, true), + noIndexClusterState(true), + randomPrevWrittenIndices(indexMetaData)); + assertThat(indices.size(), equalTo(1)); + } + + public void testGetRelevantIndicesWithUnassignedShardsOnDataOnlyNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = IncrementalClusterStateWriter.getRelevantIndices( + clusterStateWithUnassignedIndex(indexMetaData, false), + noIndexClusterState(false), + randomPrevWrittenIndices(indexMetaData)); + assertThat(indices.size(), equalTo(0)); + } + + public void testGetRelevantIndicesWithAssignedShards() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + boolean masterEligible = randomBoolean(); + Set indices = IncrementalClusterStateWriter.getRelevantIndices( + clusterStateWithAssignedIndex(indexMetaData, masterEligible), + clusterStateWithUnassignedIndex(indexMetaData, masterEligible), + randomPrevWrittenIndices(indexMetaData)); + assertThat(indices.size(), equalTo(1)); + } + + public void testGetRelevantIndicesForClosedPrevWrittenIndexOnDataOnlyNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = IncrementalClusterStateWriter.getRelevantIndices( + clusterStateWithClosedIndex(indexMetaData, false), + clusterStateWithAssignedIndex(indexMetaData, false), + Collections.singleton(indexMetaData.getIndex())); + assertThat(indices.size(), equalTo(1)); + } + + public void testGetRelevantIndicesForClosedPrevNotWrittenIndexOnDataOnlyNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = IncrementalClusterStateWriter.getRelevantIndices( + clusterStateWithJustOpenedIndex(indexMetaData, false), + clusterStateWithClosedIndex(indexMetaData, false), + Collections.emptySet()); + assertThat(indices.size(), equalTo(0)); + } + + public void testGetRelevantIndicesForWasClosedPrevWrittenIndexOnDataOnlyNode() { + IndexMetaData indexMetaData = createIndexMetaData("test"); + Set indices = IncrementalClusterStateWriter.getRelevantIndices( + clusterStateWithJustOpenedIndex(indexMetaData, false), + clusterStateWithClosedIndex(indexMetaData, false), + Collections.singleton(indexMetaData.getIndex())); + assertThat(indices.size(), equalTo(1)); + } + + public void testResolveStatesToBeWritten() throws WriteStateException { + Map indices = new HashMap<>(); + Set relevantIndices = new HashSet<>(); + + IndexMetaData removedIndex = createIndexMetaData("removed_index"); + indices.put(removedIndex.getIndex(), 1L); + + IndexMetaData versionChangedIndex = createIndexMetaData("version_changed_index"); + indices.put(versionChangedIndex.getIndex(), 2L); + relevantIndices.add(versionChangedIndex.getIndex()); + + IndexMetaData notChangedIndex = createIndexMetaData("not_changed_index"); + indices.put(notChangedIndex.getIndex(), 3L); + relevantIndices.add(notChangedIndex.getIndex()); + + IndexMetaData newIndex = createIndexMetaData("new_index"); + relevantIndices.add(newIndex.getIndex()); + + MetaData oldMetaData = MetaData.builder() + .put(removedIndex, false) + .put(versionChangedIndex, false) + .put(notChangedIndex, false) + .build(); + + MetaData newMetaData = MetaData.builder() + .put(versionChangedIndex, true) + .put(notChangedIndex, false) + .put(newIndex, false) + .build(); + + IndexMetaData newVersionChangedIndex = newMetaData.index(versionChangedIndex.getIndex()); + + List actions = + IncrementalClusterStateWriter.resolveIndexMetaDataActions(indices, relevantIndices, oldMetaData, newMetaData); + + assertThat(actions, hasSize(3)); + + for (IncrementalClusterStateWriter.IndexMetaDataAction action : actions) { + if (action instanceof IncrementalClusterStateWriter.KeepPreviousGeneration) { + assertThat(action.getIndex(), equalTo(notChangedIndex.getIndex())); + IncrementalClusterStateWriter.AtomicClusterStateWriter writer + = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); + assertThat(action.execute(writer), equalTo(3L)); + verifyZeroInteractions(writer); + } + if (action instanceof IncrementalClusterStateWriter.WriteNewIndexMetaData) { + assertThat(action.getIndex(), equalTo(newIndex.getIndex())); + IncrementalClusterStateWriter.AtomicClusterStateWriter writer + = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); + when(writer.writeIndex("freshly created", newIndex)).thenReturn(0L); + assertThat(action.execute(writer), equalTo(0L)); + } + if (action instanceof IncrementalClusterStateWriter.WriteChangedIndexMetaData) { + assertThat(action.getIndex(), equalTo(newVersionChangedIndex.getIndex())); + IncrementalClusterStateWriter.AtomicClusterStateWriter writer + = mock(IncrementalClusterStateWriter.AtomicClusterStateWriter.class); + when(writer.writeIndex(anyString(), eq(newVersionChangedIndex))).thenReturn(3L); + assertThat(action.execute(writer), equalTo(3L)); + ArgumentCaptor reason = ArgumentCaptor.forClass(String.class); + verify(writer).writeIndex(reason.capture(), eq(newVersionChangedIndex)); + assertThat(reason.getValue(), containsString(Long.toString(versionChangedIndex.getVersion()))); + assertThat(reason.getValue(), containsString(Long.toString(newVersionChangedIndex.getVersion()))); + } + } + } + + private static class MetaStateServiceWithFailures extends MetaStateService { + private final int invertedFailRate; + private boolean failRandomly; + + private MetaDataStateFormat wrap(MetaDataStateFormat format) { + return new MetaDataStateFormat(format.getPrefix()) { + @Override + public void toXContent(XContentBuilder builder, T state) throws IOException { + format.toXContent(builder, state); + } + + @Override + public T fromXContent(XContentParser parser) throws IOException { + return format.fromXContent(parser); + } + + @Override + protected Directory newDirectory(Path dir) { + MockDirectoryWrapper mock = newMockFSDirectory(dir); + if (failRandomly) { + MockDirectoryWrapper.Failure fail = new MockDirectoryWrapper.Failure() { + @Override + public void eval(MockDirectoryWrapper dir) throws IOException { + int r = randomIntBetween(0, invertedFailRate); + if (r == 0) { + throw new MockDirectoryWrapper.FakeIOException(); + } + } + }; + mock.failOn(fail); + } + closeAfterSuite(mock); + return mock; + } + }; + } + + MetaStateServiceWithFailures(int invertedFailRate, NodeEnvironment nodeEnv, NamedXContentRegistry namedXContentRegistry) { + super(nodeEnv, namedXContentRegistry); + META_DATA_FORMAT = wrap(MetaData.FORMAT); + INDEX_META_DATA_FORMAT = wrap(IndexMetaData.FORMAT); + MANIFEST_FORMAT = wrap(Manifest.FORMAT); + failRandomly = false; + this.invertedFailRate = invertedFailRate; + } + + void failRandomly() { + failRandomly = true; + } + + void noFailures() { + failRandomly = false; + } + } + + private boolean metaDataEquals(MetaData md1, MetaData md2) { + boolean equals = MetaData.isGlobalStateEquals(md1, md2); + + for (IndexMetaData imd : md1) { + IndexMetaData imd2 = md2.index(imd.getIndex()); + equals = equals && imd.equals(imd2); + } + + for (IndexMetaData imd : md2) { + IndexMetaData imd2 = md1.index(imd.getIndex()); + equals = equals && imd.equals(imd2); + } + return equals; + } + + private static MetaData randomMetaDataForTx() { + int settingNo = randomIntBetween(0, 10); + MetaData.Builder builder = MetaData.builder() + .persistentSettings(Settings.builder().put("setting" + settingNo, randomAlphaOfLength(5)).build()); + int numOfIndices = randomIntBetween(0, 3); + + for (int i = 0; i < numOfIndices; i++) { + int indexNo = randomIntBetween(0, 50); + IndexMetaData indexMetaData = IndexMetaData.builder("index" + indexNo).settings( + Settings.builder() + .put(IndexMetaData.SETTING_INDEX_UUID, "index" + indexNo) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .build() + ).build(); + builder.put(indexMetaData, false); + } + return builder.build(); + } + + public void testAtomicityWithFailures() throws IOException { + try (NodeEnvironment env = newNodeEnvironment()) { + MetaStateServiceWithFailures metaStateService = + new MetaStateServiceWithFailures(randomIntBetween(100, 1000), env, xContentRegistry()); + + // We only guarantee atomicity of writes, if there is initial Manifest file + Manifest manifest = Manifest.empty(); + MetaData metaData = MetaData.EMPTY_META_DATA; + metaStateService.writeManifestAndCleanup("startup", Manifest.empty()); + long currentTerm = randomNonNegativeLong(); + long clusterStateVersion = randomNonNegativeLong(); + + metaStateService.failRandomly(); + Set possibleMetaData = new HashSet<>(); + possibleMetaData.add(metaData); + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + IncrementalClusterStateWriter.AtomicClusterStateWriter writer = + new IncrementalClusterStateWriter.AtomicClusterStateWriter(metaStateService, manifest); + metaData = randomMetaDataForTx(); + Map indexGenerations = new HashMap<>(); + + try { + long globalGeneration = writer.writeGlobalState("global", metaData); + + for (IndexMetaData indexMetaData : metaData) { + long generation = writer.writeIndex("index", indexMetaData); + indexGenerations.put(indexMetaData.getIndex(), generation); + } + + Manifest newManifest = new Manifest(currentTerm, clusterStateVersion, globalGeneration, indexGenerations); + writer.writeManifestAndCleanup("manifest", newManifest); + possibleMetaData.clear(); + possibleMetaData.add(metaData); + manifest = newManifest; + } catch (WriteStateException e) { + if (e.isDirty()) { + possibleMetaData.add(metaData); + /* + * If dirty WriteStateException occurred, it's only safe to proceed if there is subsequent + * successful write of metadata and Manifest. We prefer to break here, not to over complicate test logic. + * See also MetaDataStateFormat#testFailRandomlyAndReadAnyState, that does not break. + */ + break; + } + } + } + + metaStateService.noFailures(); + + Tuple manifestAndMetaData = metaStateService.loadFullState(); + MetaData loadedMetaData = manifestAndMetaData.v2(); + + assertTrue(possibleMetaData.stream().anyMatch(md -> metaDataEquals(md, loadedMetaData))); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index fe7b8720981..102de69cc43 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -703,9 +703,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase { if (rarely()) { nodeEnvironment = newNodeEnvironment(); nodeEnvironments.add(nodeEnvironment); - final MockGatewayMetaState gatewayMetaState - = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode); - gatewayMetaState.start(); + final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode); + gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); delegate = gatewayMetaState.getPersistedState(); } else { nodeEnvironment = null; @@ -736,9 +735,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase { new Manifest(updatedTerm, manifest.getClusterStateVersion(), manifest.getGlobalGeneration(), manifest.getIndexGenerations())); } - final MockGatewayMetaState gatewayMetaState - = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode); - gatewayMetaState.start(); + final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode); + gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); delegate = gatewayMetaState.getPersistedState(); } else { nodeEnvironment = null; diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java index 006f2948831..b66b5ea3ee2 100644 --- a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -19,6 +19,7 @@ package org.elasticsearch.gateway; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -37,24 +38,23 @@ import org.elasticsearch.transport.TransportService; public class MockGatewayMetaState extends GatewayMetaState { private final DiscoveryNode localNode; - public MockGatewayMetaState(Settings settings, NodeEnvironment nodeEnvironment, - NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) { - super(settings, new MetaStateService(nodeEnvironment, xContentRegistry)); + public MockGatewayMetaState(DiscoveryNode localNode) { this.localNode = localNode; } @Override - protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) { + void upgradeMetaData(Settings settings, MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, + MetaDataUpgrader metaDataUpgrader) { // MetaData upgrade is tested in GatewayMetaStateTests, we override this method to NOP to make mocking easier } @Override - public void applyClusterStateUpdaters(TransportService transportService, ClusterService clusterService) { + ClusterState prepareInitialClusterState(TransportService transportService, ClusterService clusterService, ClusterState clusterState) { // Just set localNode here, not to mess with ClusterService and IndicesService mocking - previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode); + return ClusterStateUpdaters.setLocalNode(clusterState, localNode); } - public void start() { - start(null, null, null, null); + public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXContentRegistry xContentRegistry) { + start(settings, null, null, new MetaStateService(nodeEnvironment, xContentRegistry), null, null); } }