Fix testDataOnlyNodePersistence (#56893)
This test failed if all 1000 top-level `rarely()` calls in the loop returned `false`, because then we would never set the term of the persisted state. This commit fixes this by adding an earlier call to `persistedState#setCurrentTerm`. It also changes the test to clean up the threadpools it starts whether it passes or fails.
This commit is contained in:
parent
74554f1ae8
commit
64280b489b
|
@ -49,6 +49,7 @@ import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOError;
|
import java.io.IOError;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
@ -328,95 +329,103 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDataOnlyNodePersistence() throws Exception {
|
public void testDataOnlyNodePersistence() throws Exception {
|
||||||
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
|
final List<Closeable> cleanup = new ArrayList<>(2);
|
||||||
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
|
|
||||||
Settings settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).put(
|
|
||||||
Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_NAME_SETTING.getKey(), "test").build();
|
|
||||||
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
|
|
||||||
final TransportService transportService = mock(TransportService.class);
|
|
||||||
TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode");
|
|
||||||
when(transportService.getThreadPool()).thenReturn(threadPool);
|
|
||||||
ClusterService clusterService = mock(ClusterService.class);
|
|
||||||
when(clusterService.getClusterSettings()).thenReturn(
|
|
||||||
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
|
||||||
final PersistedClusterStateService persistedClusterStateService =
|
|
||||||
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
|
|
||||||
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
|
|
||||||
gateway.start(settings, transportService, clusterService,
|
|
||||||
new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
|
|
||||||
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
|
|
||||||
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));
|
|
||||||
|
|
||||||
//generate random coordinationMetadata with different lastAcceptedConfiguration and lastCommittedConfiguration
|
try {
|
||||||
CoordinationMetadata coordinationMetadata;
|
DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
|
||||||
do {
|
Sets.newHashSet(DiscoveryNodeRole.DATA_ROLE), Version.CURRENT);
|
||||||
coordinationMetadata = createCoordinationMetadata(randomNonNegativeLong());
|
Settings settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).put(
|
||||||
} while (coordinationMetadata.getLastAcceptedConfiguration().equals(coordinationMetadata.getLastCommittedConfiguration()));
|
Node.NODE_MASTER_SETTING.getKey(), false).put(Node.NODE_NAME_SETTING.getKey(), "test").build();
|
||||||
|
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode);
|
||||||
|
cleanup.add(gateway);
|
||||||
|
final TransportService transportService = mock(TransportService.class);
|
||||||
|
TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode");
|
||||||
|
cleanup.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
|
||||||
|
when(transportService.getThreadPool()).thenReturn(threadPool);
|
||||||
|
ClusterService clusterService = mock(ClusterService.class);
|
||||||
|
when(clusterService.getClusterSettings()).thenReturn(
|
||||||
|
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||||
|
final PersistedClusterStateService persistedClusterStateService =
|
||||||
|
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
|
||||||
|
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
|
||||||
|
gateway.start(settings, transportService, clusterService,
|
||||||
|
new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
|
||||||
|
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
|
||||||
|
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));
|
||||||
|
|
||||||
ClusterState state = createClusterState(randomNonNegativeLong(),
|
//generate random coordinationMetadata with different lastAcceptedConfiguration and lastCommittedConfiguration
|
||||||
Metadata.builder().coordinationMetadata(coordinationMetadata)
|
CoordinationMetadata coordinationMetadata;
|
||||||
.clusterUUID(randomAlphaOfLength(10)).build());
|
do {
|
||||||
persistedState.setLastAcceptedState(state);
|
coordinationMetadata = createCoordinationMetadata(randomNonNegativeLong());
|
||||||
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));
|
} while (coordinationMetadata.getLastAcceptedConfiguration().equals(coordinationMetadata.getLastCommittedConfiguration()));
|
||||||
|
|
||||||
assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
|
ClusterState state = createClusterState(randomNonNegativeLong(),
|
||||||
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
|
Metadata.builder().coordinationMetadata(coordinationMetadata)
|
||||||
CoordinationMetadata persistedCoordinationMetadata =
|
.clusterUUID(randomAlphaOfLength(10)).build());
|
||||||
persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
|
persistedState.setCurrentTerm(state.term());
|
||||||
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
|
persistedState.setLastAcceptedState(state);
|
||||||
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
|
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));
|
||||||
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
|
|
||||||
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
|
|
||||||
|
|
||||||
persistedState.markLastAcceptedStateAsCommitted();
|
assertThat(persistedState.getLastAcceptedState().getLastAcceptedConfiguration(),
|
||||||
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));
|
not(equalTo(persistedState.getLastAcceptedState().getLastCommittedConfiguration())));
|
||||||
|
CoordinationMetadata persistedCoordinationMetadata =
|
||||||
|
persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
|
||||||
|
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
|
||||||
|
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
|
||||||
|
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
|
||||||
|
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
|
||||||
|
|
||||||
CoordinationMetadata expectedCoordinationMetadata = CoordinationMetadata.builder(coordinationMetadata)
|
persistedState.markLastAcceptedStateAsCommitted();
|
||||||
.lastCommittedConfiguration(coordinationMetadata.getLastAcceptedConfiguration()).build();
|
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));
|
||||||
ClusterState expectedClusterState =
|
|
||||||
ClusterState.builder(state).metadata(Metadata.builder().coordinationMetadata(expectedCoordinationMetadata)
|
|
||||||
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();
|
|
||||||
|
|
||||||
assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
|
CoordinationMetadata expectedCoordinationMetadata = CoordinationMetadata.builder(coordinationMetadata)
|
||||||
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
|
.lastCommittedConfiguration(coordinationMetadata.getLastAcceptedConfiguration()).build();
|
||||||
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
|
ClusterState expectedClusterState =
|
||||||
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
|
ClusterState.builder(state).metadata(Metadata.builder().coordinationMetadata(expectedCoordinationMetadata)
|
||||||
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
|
.clusterUUID(state.metadata().clusterUUID()).clusterUUIDCommitted(true).build()).build();
|
||||||
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
|
|
||||||
assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted());
|
|
||||||
|
|
||||||
// generate a series of updates and check if batching works
|
assertClusterStateEqual(expectedClusterState, persistedState.getLastAcceptedState());
|
||||||
final String indexName = randomAlphaOfLength(10);
|
persistedCoordinationMetadata = persistedClusterStateService.loadBestOnDiskState().metadata.coordinationMetadata();
|
||||||
long currentTerm = state.term();
|
assertThat(persistedCoordinationMetadata.getLastAcceptedConfiguration(),
|
||||||
for (int i = 0; i < 1000; i++) {
|
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
|
||||||
if (rarely()) {
|
assertThat(persistedCoordinationMetadata.getLastCommittedConfiguration(),
|
||||||
// bump term
|
equalTo(GatewayMetaState.AsyncLucenePersistedState.staleStateConfiguration));
|
||||||
currentTerm = currentTerm + (rarely() ? randomIntBetween(1, 5) : 0L);
|
assertTrue(persistedClusterStateService.loadBestOnDiskState().metadata.clusterUUIDCommitted());
|
||||||
persistedState.setCurrentTerm(currentTerm);
|
|
||||||
} else {
|
// generate a series of updates and check if batching works
|
||||||
// update cluster state
|
final String indexName = randomAlphaOfLength(10);
|
||||||
final int numberOfShards = randomIntBetween(1, 5);
|
long currentTerm = state.term();
|
||||||
final long term = Math.min(state.term() + (rarely() ? randomIntBetween(1, 5) : 0L), currentTerm);
|
for (int i = 0; i < 1000; i++) {
|
||||||
final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, i);
|
if (rarely()) {
|
||||||
state = createClusterState(state.version() + 1,
|
// bump term
|
||||||
Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build());
|
currentTerm = currentTerm + (rarely() ? randomIntBetween(1, 5) : 0L);
|
||||||
persistedState.setLastAcceptedState(state);
|
persistedState.setCurrentTerm(currentTerm);
|
||||||
|
} else {
|
||||||
|
// update cluster state
|
||||||
|
final int numberOfShards = randomIntBetween(1, 5);
|
||||||
|
final long term = Math.min(state.term() + (rarely() ? randomIntBetween(1, 5) : 0L), currentTerm);
|
||||||
|
final IndexMetadata indexMetadata = createIndexMetadata(indexName, numberOfShards, i);
|
||||||
|
state = createClusterState(state.version() + 1,
|
||||||
|
Metadata.builder().coordinationMetadata(createCoordinationMetadata(term)).put(indexMetadata, false).build());
|
||||||
|
persistedState.setLastAcceptedState(state);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
assertEquals(currentTerm, persistedState.getCurrentTerm());
|
||||||
|
assertClusterStateEqual(state, persistedState.getLastAcceptedState());
|
||||||
|
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));
|
||||||
|
|
||||||
|
gateway.close();
|
||||||
|
assertTrue(cleanup.remove(gateway));
|
||||||
|
|
||||||
|
try (CoordinationState.PersistedState reloadedPersistedState = newGatewayPersistedState()) {
|
||||||
|
assertEquals(currentTerm, reloadedPersistedState.getCurrentTerm());
|
||||||
|
assertClusterStateEqual(GatewayMetaState.AsyncLucenePersistedState.resetVotingConfiguration(state),
|
||||||
|
reloadedPersistedState.getLastAcceptedState());
|
||||||
|
assertNotNull(reloadedPersistedState.getLastAcceptedState().metadata().index(indexName));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
IOUtils.close(cleanup);
|
||||||
}
|
}
|
||||||
assertEquals(currentTerm, persistedState.getCurrentTerm());
|
|
||||||
assertClusterStateEqual(state, persistedState.getLastAcceptedState());
|
|
||||||
assertBusy(() -> assertTrue(gateway.allPendingAsyncStatesWritten()));
|
|
||||||
|
|
||||||
gateway.close();
|
|
||||||
|
|
||||||
try (CoordinationState.PersistedState reloadedPersistedState = newGatewayPersistedState()) {
|
|
||||||
assertEquals(currentTerm, reloadedPersistedState.getCurrentTerm());
|
|
||||||
assertClusterStateEqual(GatewayMetaState.AsyncLucenePersistedState.resetVotingConfiguration(state),
|
|
||||||
reloadedPersistedState.getLastAcceptedState());
|
|
||||||
assertNotNull(reloadedPersistedState.getLastAcceptedState().metadata().index(indexName));
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testStatePersistenceWithIOIssues() throws IOException {
|
public void testStatePersistenceWithIOIssues() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue