diff --git a/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java b/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java index 31a4aaf8ec2..342915f2d14 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java @@ -50,12 +50,12 @@ public abstract class FsDirectoryService extends AbstractIndexShardComponent imp } @Override - public final long throttleTimeInNanos() { + public long throttleTimeInNanos() { return rateLimitingTimeInNanos.count(); } @Override - public final StoreRateLimiting rateLimiting() { + public StoreRateLimiting rateLimiting() { return indexStore.rateLimiting(); } @@ -135,7 +135,7 @@ public abstract class FsDirectoryService extends AbstractIndexShardComponent imp protected abstract Directory newFSDirectory(File location, LockFactory lockFactory) throws IOException; @Override - public final void onPause(long nanos) { + public void onPause(long nanos) { rateLimitingTimeInNanos.inc(nanos); } } diff --git a/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java b/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java index 177933b4bac..500d65089fc 100644 --- a/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java +++ b/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java @@ -19,16 +19,22 @@ package org.elasticsearch.indices.settings; +import org.apache.lucene.util.LuceneTestCase.Slow; import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.index.engine.VersionConflictEngineException; +import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider; +import org.elasticsearch.index.store.support.AbstractIndexStore; import org.elasticsearch.test.ElasticsearchIntegrationTest; import org.junit.Test; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -112,4 +118,120 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest { assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3).setVersion(4), VersionConflictEngineException.class); // delete is should not be in cache } + + // #6626: make sure we can update throttle settings and the changes take effect + @Test + @Slow + public void testUpdateThrottleSettings() { + + // No throttling at first, only 1 non-replicated shard, force lots of merging: + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2") + )); + ensureGreen(); + long termUpto = 0; + for(int i=0;i<1000;i++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for(int j=0;j<100;j++) { + sb.append(' '); + sb.append(termUpto++); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + if (i % 2 == 0) { + refresh(); + } + } + + // No merge IO throttling should have happened: + NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + for(NodeStats stats : nodesStats.getNodes()) { + assertThat(stats.getIndices().getStore().getThrottleTime().getMillis(), equalTo(0l)); + } + + // Now updates settings to turn on merge throttling lowish rate + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge") + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb")) + .get(); + + // Make sure setting says it is in fact changed: + GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get(); + assertThat(getSettingsResponse.getSetting("test", AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE), equalTo("merge")); + + // Also make sure we see throttling kicking in: + boolean done = false; + while (done == false) { + // Provoke slowish merging by making many unique terms: + for(int i=0;i<5;i++) { + StringBuilder sb = new StringBuilder(); + for(int j=0;j<100;j++) { + sb.append(' '); + sb.append(termUpto++); + sb.append(" some random text that keeps repeating over and over again hambone"); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + } + refresh(); + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + for(NodeStats stats : nodesStats.getNodes()) { + long throttleMillis = stats.getIndices().getStore().getThrottleTime().getMillis(); + if (throttleMillis > 0) { + done = true; + break; + } + } + } + + // Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish: + client().admin().indices().prepareOptimize("test").get(); + + // Now updates settings to disable merge throttling + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none")) + .get(); + + // Record current throttling so far + long sumThrottleTime = 0; + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + for(NodeStats stats : nodesStats.getNodes()) { + sumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis(); + } + + // Make sure no further throttling happens: + for(int i=0;i<1000;i++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for(int j=0;j<100;j++) { + sb.append(' '); + sb.append(termUpto++); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + if (i % 2 == 0) { + refresh(); + } + } + + long newSumThrottleTime = 0; + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + for(NodeStats stats : nodesStats.getNodes()) { + newSumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis(); + } + + // No additional merge IO throttling should have happened: + assertEquals(sumThrottleTime, newSumThrottleTime); + } } diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 0e3106456ce..b5c6369818a 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -1116,7 +1116,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase /** * Indexes the given {@link IndexRequestBuilder} instances randomly. It shuffles the given builders and either - * indexes they in a blocking or async fashion. This is very useful to catch problems that relate to internal document + * indexes them in a blocking or async fashion. This is very useful to catch problems that relate to internal document * ids or index segment creations. Some features might have bug when a given document is the first or the last in a * segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index * layout. @@ -1139,7 +1139,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase * layout. * * @param forceRefresh if true all involved indices are refreshed once the documents are indexed. - * @param dummyDocuments if true some empty dummy documents are may be randomly inserted into the document list and deleted once + * @param dummyDocuments if true some empty dummy documents may be randomly inserted into the document list and deleted once * all documents are indexed. This is useful to produce deleted documents on the server side. * @param builders the documents to index. */ diff --git a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java index 8ea159e8716..78dcc47f756 100644 --- a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java +++ b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java @@ -23,6 +23,7 @@ import com.google.common.base.Charsets; import org.apache.lucene.index.CheckIndex; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -110,4 +111,19 @@ public class MockFSDirectoryService extends FsDirectoryService { logger.warn("failed to check index", e); } } + + @Override + public void onPause(long nanos) { + delegateService.onPause(nanos); + } + + @Override + public StoreRateLimiting rateLimiting() { + return delegateService.rateLimiting(); + } + + @Override + public long throttleTimeInNanos() { + return delegateService.throttleTimeInNanos(); + } }