diff --git a/server/src/main/java/org/elasticsearch/gateway/ClusterStateUpdaters.java b/server/src/main/java/org/elasticsearch/gateway/ClusterStateUpdaters.java index b9dbae83b79..056919ab1c4 100644 --- a/server/src/main/java/org/elasticsearch/gateway/ClusterStateUpdaters.java +++ b/server/src/main/java/org/elasticsearch/gateway/ClusterStateUpdaters.java @@ -130,6 +130,13 @@ public class ClusterStateUpdaters { .build(); } + public static ClusterState addStateNotRecoveredBlock(ClusterState state) { + return ClusterState.builder(state) + .blocks(ClusterBlocks.builder() + .blocks(state.blocks()).addGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK).build()) + .build(); + } + static ClusterState mixCurrentStateAndRecoveredState(final ClusterState currentState, final ClusterState recoveredState) { assert currentState.metaData().indices().isEmpty(); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index d380cd93c47..f14d86c7602 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -28,7 +28,6 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.coordination.CoordinationState; import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState; import org.elasticsearch.cluster.coordination.InMemoryPersistedState; @@ -64,8 +63,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.UnaryOperator; -import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; - /** * This class is responsible for storing/retrieving metadata to/from disk. * When instance of this class is created, constructor ensures that this version is compatible with state stored on disk and performs @@ -124,11 +121,9 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState. previousManifest = manifestAndMetaData.v1(); final MetaData metaData = manifestAndMetaData.v2(); - final ClusterBlocks.Builder blocks = ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK); previousClusterState = ClusterState.builder(clusterName) .version(previousManifest.getClusterStateVersion()) - .blocks(blocks.build()) .metaData(metaData).build(); logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS))); @@ -139,6 +134,7 @@ public class GatewayMetaState implements ClusterStateApplier, CoordinationState. assert transportService.getLocalNode() != null : "transport service is not yet started"; previousClusterState = Function.identity() + .andThen(ClusterStateUpdaters::addStateNotRecoveredBlock) .andThen(state -> ClusterStateUpdaters.setLocalNode(state, transportService.getLocalNode())) .andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings())) .andThen(state -> ClusterStateUpdaters.closeBadIndices(state, indicesService)) diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index 98e21581d6f..96746f3343e 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -57,12 +57,15 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.zen.PublishClusterStateStats; import org.elasticsearch.discovery.zen.UnicastHostsProvider.HostsResolver; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.disruption.DisruptableMockTransport; import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; +import org.junit.After; import org.junit.Before; import java.io.IOException; @@ -131,6 +134,16 @@ import static org.hamcrest.Matchers.startsWith; public class CoordinatorTests extends ESTestCase { + private final List nodeEnvironments = new ArrayList<>(); + + @After + public void closeNodeEnvironmentsAfterEachTest() { + for (NodeEnvironment nodeEnvironment : nodeEnvironments) { + nodeEnvironment.close(); + } + nodeEnvironments.clear(); + } + @Before public void resetPortCounterBeforeEachTest() { resetPortCounter(); @@ -1102,8 +1115,8 @@ public class CoordinatorTests extends ESTestCase { private final Set blackholedNodes = new HashSet<>(); private final Map committedStatesByVersion = new HashMap<>(); - private final Function defaultPersistedStateSupplier = localNode -> new MockPersistedState(0L, - clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); + private final Function defaultPersistedStateSupplier = + localNode -> new MockPersistedState(localNode); Cluster(int initialNodeCount) { this(initialNodeCount, true); @@ -1490,35 +1503,77 @@ public class CoordinatorTests extends ESTestCase { return getAnyNode(); } - class MockPersistedState extends InMemoryPersistedState { - MockPersistedState(long term, ClusterState acceptedState) { - super(term, acceptedState); + class MockPersistedState implements PersistedState { + private final PersistedState delegate; + private final NodeEnvironment nodeEnvironment; + + MockPersistedState(DiscoveryNode localNode) { + try { + if (rarely()) { + nodeEnvironment = newNodeEnvironment(); + nodeEnvironments.add(nodeEnvironment); + delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), localNode) + .getPersistedState(Settings.EMPTY, null); + } else { + nodeEnvironment = null; + delegate = new InMemoryPersistedState(0L, + clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)); + } + } catch (IOException e) { + throw new UncheckedIOException("Unable to create MockPersistedState", e); + } + } + + MockPersistedState(DiscoveryNode newLocalNode, MockPersistedState oldState) { + try { + if (oldState.nodeEnvironment != null) { + nodeEnvironment = oldState.nodeEnvironment; + delegate = new MockGatewayMetaState(Settings.EMPTY, nodeEnvironment, xContentRegistry(), newLocalNode) + .getPersistedState(Settings.EMPTY, null); + } else { + nodeEnvironment = null; + BytesStreamOutput outStream = new BytesStreamOutput(); + outStream.setVersion(Version.CURRENT); + oldState.getLastAcceptedState().writeTo(outStream); + StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), + new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); + delegate = new InMemoryPersistedState(oldState.getCurrentTerm(), ClusterState.readFrom(inStream, + newLocalNode)); // adapts it to new localNode instance + } + } catch (IOException e) { + throw new UncheckedIOException("Unable to create MockPersistedState", e); + } } private void possiblyFail(String description) { if (disruptStorage && rarely()) { - // TODO revisit this when we've decided how PersistedState should throw exceptions logger.trace("simulating IO exception [{}]", description); - if (randomBoolean()) { - throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']')); - } else { - throw new CoordinationStateRejectedException("simulated IO exception [" + description + ']'); - } + // In the real-life IOError might be thrown, for example if state fsync fails. + // This will require node restart and we're not emulating it here. + throw new UncheckedIOException(new IOException("simulated IO exception [" + description + ']')); } } + @Override + public long getCurrentTerm() { + return delegate.getCurrentTerm(); + } + + @Override + public ClusterState getLastAcceptedState() { + return delegate.getLastAcceptedState(); + } + @Override public void setCurrentTerm(long currentTerm) { possiblyFail("before writing term of " + currentTerm); - super.setCurrentTerm(currentTerm); - // TODO possiblyFail() here if that's a failure mode of the storage layer + delegate.setCurrentTerm(currentTerm); } @Override public void setLastAcceptedState(ClusterState clusterState) { possiblyFail("before writing last-accepted state of term=" + clusterState.term() + ", version=" + clusterState.version()); - super.setLastAcceptedState(clusterState); - // TODO possiblyFail() here if that's a failure mode of the storage layer + delegate.setLastAcceptedState(clusterState); } } @@ -1528,7 +1583,7 @@ public class CoordinatorTests extends ESTestCase { private final int nodeIndex; private Coordinator coordinator; private final DiscoveryNode localNode; - private final PersistedState persistedState; + private final MockPersistedState persistedState; private FakeClusterApplier clusterApplier; private AckedFakeThreadPoolMasterService masterService; private TransportService transportService; @@ -1540,7 +1595,7 @@ public class CoordinatorTests extends ESTestCase { this(nodeIndex, createDiscoveryNode(nodeIndex, masterEligible), defaultPersistedStateSupplier); } - ClusterNode(int nodeIndex, DiscoveryNode localNode, Function persistedStateSupplier) { + ClusterNode(int nodeIndex, DiscoveryNode localNode, Function persistedStateSupplier) { this.nodeIndex = nodeIndex; this.localNode = localNode; persistedState = persistedStateSupplier.apply(localNode); @@ -1608,19 +1663,7 @@ public class CoordinatorTests extends ESTestCase { UUIDs.randomBase64UUID(random()), // generated deterministically for repeatable tests address.address().getHostString(), address.getAddress(), address, Collections.emptyMap(), localNode.isMasterNode() ? EnumSet.allOf(Role.class) : emptySet(), Version.CURRENT); - final PersistedState newPersistedState; - try { - BytesStreamOutput outStream = new BytesStreamOutput(); - outStream.setVersion(Version.CURRENT); - persistedState.getLastAcceptedState().writeTo(outStream); - StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(), - new NamedWriteableRegistry(ClusterModule.getNamedWriteables())); - newPersistedState = new MockPersistedState(persistedState.getCurrentTerm(), - ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance - } catch (IOException e) { - throw new UncheckedIOException(e); - } - return new ClusterNode(nodeIndex, newLocalNode, node -> newPersistedState); + return new ClusterNode(nodeIndex, newLocalNode, node -> new MockPersistedState(newLocalNode, persistedState)); } private PersistedState getPersistedState() { diff --git a/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java b/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java index 36f462fd90e..b34bcf87bdb 100644 --- a/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/ClusterStateUpdatersTests.java @@ -46,6 +46,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.elasticsearch.cluster.metadata.MetaData.CLUSTER_READ_ONLY_BLOCK; +import static org.elasticsearch.gateway.ClusterStateUpdaters.addStateNotRecoveredBlock; import static org.elasticsearch.gateway.ClusterStateUpdaters.closeBadIndices; import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered; import static org.elasticsearch.gateway.ClusterStateUpdaters.mixCurrentStateAndRecoveredState; @@ -182,6 +183,24 @@ public class ClusterStateUpdatersTests extends ESTestCase { assertFalse(newState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)); } + public void testAddStateNotRecoveredBlock() { + final MetaData.Builder metaDataBuilder = MetaData.builder() + .persistentSettings(Settings.builder().put("test", "test").build()); + final IndexMetaData indexMetaData = createIndexMetaData("test", Settings.EMPTY); + metaDataBuilder.put(indexMetaData, false); + + final ClusterState initialState = ClusterState + .builder(ClusterState.EMPTY_STATE) + .metaData(metaDataBuilder) + .build(); + assertFalse(initialState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)); + + final ClusterState newState = addStateNotRecoveredBlock(initialState); + + assertMetaDataEquals(initialState, newState); + assertTrue(newState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)); + } + public void testCloseBadIndices() throws IOException { final IndicesService indicesService = mock(IndicesService.class); final IndexMetaData good = createIndexMetaData("good", Settings.EMPTY); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 64fdb37b1c8..921bcac3d4c 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -27,49 +27,20 @@ import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigE import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.env.NodeEnvironment; -import org.elasticsearch.indices.IndicesService; -import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.TransportService; import java.io.IOException; import java.util.Collections; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; -import static org.mockito.Mockito.mock; public class GatewayMetaStatePersistedStateTests extends ESTestCase { - private class GatewayMetaStateUT extends GatewayMetaState { - private final DiscoveryNode localNode; - - GatewayMetaStateUT(Settings settings, NodeEnvironment nodeEnvironment, DiscoveryNode localNode) throws IOException { - super(settings, nodeEnvironment, new MetaStateService(nodeEnvironment, xContentRegistry()), - mock(MetaDataIndexUpgradeService.class), mock(MetaDataUpgrader.class), - mock(TransportService.class), mock(ClusterService.class), - mock(IndicesService.class)); - this.localNode = localNode; - } - - @Override - protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) { - // MetaData upgrade is tested in GatewayMetaStateTests, we override this method to NOP to make mocking easier - } - - @Override - public void applyClusterStateUpdaters() { - // Just set localNode here, not to mess with ClusterService and IndicesService mocking - previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode); - } - } - private NodeEnvironment nodeEnvironment; private ClusterName clusterName; private Settings settings; @@ -91,13 +62,13 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { super.tearDown(); } - private GatewayMetaStateUT newGateway() throws IOException { - GatewayMetaStateUT gateway = new GatewayMetaStateUT(settings, nodeEnvironment, localNode); + private MockGatewayMetaState newGateway() throws IOException { + MockGatewayMetaState gateway = new MockGatewayMetaState(settings, nodeEnvironment, xContentRegistry(), localNode); gateway.applyClusterStateUpdaters(); return gateway; } - private GatewayMetaStateUT maybeNew(GatewayMetaStateUT gateway) throws IOException { + private MockGatewayMetaState maybeNew(MockGatewayMetaState gateway) throws IOException { if (randomBoolean()) { return newGateway(); } @@ -105,7 +76,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } public void testInitialState() throws IOException { - GatewayMetaStateUT gateway = newGateway(); + MockGatewayMetaState gateway = newGateway(); ClusterState state = gateway.getLastAcceptedState(); assertThat(state.getClusterName(), equalTo(clusterName)); assertTrue(MetaData.isGlobalStateEquals(state.metaData(), MetaData.EMPTY_META_DATA)); @@ -117,7 +88,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } public void testSetCurrentTerm() throws IOException { - GatewayMetaStateUT gateway = newGateway(); + MockGatewayMetaState gateway = newGateway(); for (int i = 0; i < randomIntBetween(1, 5); i++) { final long currentTerm = randomNonNegativeLong(); @@ -171,7 +142,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } public void testSetLastAcceptedState() throws IOException { - GatewayMetaStateUT gateway = newGateway(); + MockGatewayMetaState gateway = newGateway(); final long term = randomNonNegativeLong(); for (int i = 0; i < randomIntBetween(1, 5); i++) { @@ -194,7 +165,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } public void testSetLastAcceptedStateTermChanged() throws IOException { - GatewayMetaStateUT gateway = newGateway(); + MockGatewayMetaState gateway = newGateway(); final String indexName = randomAlphaOfLength(10); final int numberOfShards = randomIntBetween(1, 5); @@ -218,7 +189,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } public void testCurrentTermAndTermAreDifferent() throws IOException { - GatewayMetaStateUT gateway = newGateway(); + MockGatewayMetaState gateway = newGateway(); long currentTerm = randomNonNegativeLong(); long term = randomValueOtherThan(currentTerm, () -> randomNonNegativeLong()); @@ -233,7 +204,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } public void testMarkAcceptedConfigAsCommitted() throws IOException { - GatewayMetaStateUT gateway = newGateway(); + MockGatewayMetaState gateway = newGateway(); //generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration CoordinationMetaData coordinationMetaData; diff --git a/server/src/test/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/server/src/test/java/org/elasticsearch/gateway/MockGatewayMetaState.java new file mode 100644 index 00000000000..7541ca860de --- /dev/null +++ b/server/src/test/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.gateway; + +import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.plugins.MetaDataUpgrader; +import org.elasticsearch.transport.TransportService; + +import java.io.IOException; + +import static org.mockito.Mockito.mock; + +/** + * {@link GatewayMetaState} constructor accepts a lot of arguments. + * It's not always easy / convenient to construct these dependencies. + * This class constructor takes far fewer dependencies and constructs usable {@link GatewayMetaState} with 2 restrictions: + * no metadata upgrade will be performed and no cluster state updaters will be run. This is sufficient for most of the tests. + * Metadata upgrade is tested in {@link GatewayMetaStateTests} and different {@link ClusterStateUpdaters} in + * {@link ClusterStateUpdatersTests}. + */ +public class MockGatewayMetaState extends GatewayMetaState { + private final DiscoveryNode localNode; + + public MockGatewayMetaState(Settings settings, NodeEnvironment nodeEnvironment, + NamedXContentRegistry xContentRegistry, DiscoveryNode localNode) throws IOException { + super(settings, nodeEnvironment, new MetaStateService(nodeEnvironment, xContentRegistry), + mock(MetaDataIndexUpgradeService.class), mock(MetaDataUpgrader.class), + mock(TransportService.class), mock(ClusterService.class), + mock(IndicesService.class)); + this.localNode = localNode; + } + + @Override + protected void upgradeMetaData(MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) { + // MetaData upgrade is tested in GatewayMetaStateTests, we override this method to NOP to make mocking easier + } + + @Override + public void applyClusterStateUpdaters() { + // Just set localNode here, not to mess with ClusterService and IndicesService mocking + previousClusterState = ClusterStateUpdaters.setLocalNode(previousClusterState, localNode); + } +}