Reduce global checkpoint sync interval in disruption tests (#38931)

We verify seq_no_stats is aligned between copies at the end of some
disruption tests. Sometimes, the assertion `assertSeqNos` is tripped due
to a lagged global checkpoint on replicas. The global checkpoint on
replicas is lagged because we sync the global checkpoint 30 seconds (by
default) after the last replication operation. This change reduces the
global checkpoint sync-internal to 1s in the disruption tests.

Closes #38318
Closes #36789
This commit is contained in:
Nhat Nguyen 2019-02-15 13:40:17 -05:00
parent a67b9f6d1f
commit 20755e666c
4 changed files with 30 additions and 10 deletions

View File

@ -46,6 +46,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect; import org.elasticsearch.test.disruption.NetworkDisruption.NetworkDisconnect;
@ -84,7 +85,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {
// disruption tests need MockTransportService // disruption tests need MockTransportService
return Arrays.asList(MockTransportService.TestPlugin.class); return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
} }
public void testBulkWeirdScenario() throws Exception { public void testBulkWeirdScenario() throws Exception {
@ -92,7 +93,9 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
internalCluster().startDataOnlyNodes(2); internalCluster().startDataOnlyNodes(2);
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder() assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get()); .put("index.number_of_shards", 1).put("index.number_of_replicas", 1)
.put("index.global_checkpoint_sync.interval", "1s"))
.get());
ensureGreen(); ensureGreen();
BulkResponse bulkResponse = client().prepareBulk() BulkResponse bulkResponse = client().prepareBulk()

View File

@ -30,8 +30,10 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.disruption.NetworkDisruption; import org.elasticsearch.test.disruption.NetworkDisruption;
import org.elasticsearch.test.disruption.NetworkDisruption.Bridge; import org.elasticsearch.test.disruption.NetworkDisruption.Bridge;
@ -65,6 +67,13 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(DEFAULT_SETTINGS).build(); return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(DEFAULT_SETTINGS).build();
} }
@Override
public Settings indexSettings() {
return Settings.builder().put(super.indexSettings())
// sync global checkpoint quickly so we can verify seq_no_stats aligned between all copies after tests.
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s").build();
}
@Override @Override
protected int numberOfShards() { protected int numberOfShards() {
return 3; return 3;
@ -128,7 +137,7 @@ public abstract class AbstractDisruptionTestCase extends ESIntegTestCase {
@Override @Override
protected Collection<Class<? extends Plugin>> nodePlugins() { protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class); return Arrays.asList(MockTransportService.TestPlugin.class, InternalSettingsPlugin.class);
} }
ClusterState getNodeClusterState(String node) { ClusterState getNodeClusterState(String node) {

View File

@ -110,6 +110,13 @@ public class RelocationIT extends ESIntegTestCase {
internalCluster().assertSameDocIdsOnShards(); internalCluster().assertSameDocIdsOnShards();
} }
@Override
public Settings indexSettings() {
return Settings.builder().put(super.indexSettings())
// sync global checkpoint quickly so we can verify seq_no_stats aligned between all copies after tests.
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s").build();
}
public void testSimpleRelocationNoIndexing() { public void testSimpleRelocationNoIndexing() {
logger.info("--> starting [node1] ..."); logger.info("--> starting [node1] ...");
final String node_1 = internalCluster().startNode(); final String node_1 = internalCluster().startNode();
@ -279,8 +286,7 @@ public class RelocationIT extends ESIntegTestCase {
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)
.put("index.number_of_replicas", numberOfReplicas) .put("index.number_of_replicas", numberOfReplicas)
.put("index.refresh_interval", -1) // we want to control refreshes .put("index.refresh_interval", -1) // we want to control refreshes
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms")) ).get();
.get();
for (int i = 1; i < numberOfNodes; i++) { for (int i = 1; i < numberOfNodes; i++) {
logger.info("--> starting [node_{}] ...", i); logger.info("--> starting [node_{}] ...", i);
@ -465,8 +471,7 @@ public class RelocationIT extends ESIntegTestCase {
final Settings.Builder settings = Settings.builder() final Settings.Builder settings = Settings.builder()
.put("index.routing.allocation.exclude.color", "blue") .put("index.routing.allocation.exclude.color", "blue")
.put(indexSettings()) .put(indexSettings())
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1)) .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomInt(halfNodes - 1));
.put(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "100ms");
assertAcked(prepareCreate("test", settings)); assertAcked(prepareCreate("test", settings));
assertAllShardsOnNodes("test", redNodes); assertAllShardsOnNodes("test", redNodes);
int numDocs = randomIntBetween(100, 150); int numDocs = randomIntBetween(100, 150);
@ -518,8 +523,8 @@ public class RelocationIT extends ESIntegTestCase {
prepareCreate("test", Settings.builder() prepareCreate("test", Settings.builder()
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
.put("index.refresh_interval", -1) // we want to control refreshes // we want to control refreshes
).get(); .put("index.refresh_interval", -1)).get();
logger.info("--> index 10 docs"); logger.info("--> index 10 docs");
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {

View File

@ -43,6 +43,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.DocIdSeqNoAndTerm; import org.elasticsearch.index.engine.DocIdSeqNoAndTerm;
import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.seqno.SeqNoStats; import org.elasticsearch.index.seqno.SeqNoStats;
@ -62,6 +63,7 @@ import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.BackgroundIndexer; import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.MockHttpTransport; import org.elasticsearch.test.MockHttpTransport;
import org.elasticsearch.test.NodeConfigurationSource; import org.elasticsearch.test.NodeConfigurationSource;
@ -125,7 +127,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
stopClusters(); stopClusters();
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class, Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
MockHttpTransport.TestPlugin.class, MockTransportService.TestPlugin.class, MockHttpTransport.TestPlugin.class, MockTransportService.TestPlugin.class,
MockNioTransportPlugin.class); MockNioTransportPlugin.class, InternalSettingsPlugin.class);
InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(), InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null), 0, "leader", mockPlugins, numberOfNodesPerCluster(), "leader_cluster", createNodeConfigurationSource(null), 0, "leader", mockPlugins,
@ -390,6 +392,7 @@ public abstract class CcrIntegTestCase extends ESTestCase {
builder.startObject("settings"); builder.startObject("settings");
{ {
builder.field(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0); builder.field(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0);
builder.field(IndexService.GLOBAL_CHECKPOINT_SYNC_INTERVAL_SETTING.getKey(), "1s");
builder.field("index.number_of_shards", numberOfShards); builder.field("index.number_of_shards", numberOfShards);
builder.field("index.number_of_replicas", numberOfReplicas); builder.field("index.number_of_replicas", numberOfReplicas);
for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) { for (final Map.Entry<String, String> additionalSetting : additionalIndexSettings.entrySet()) {