Test: add test case verifying updating merge IO throttle settings works
Closes #6842
This commit is contained in:
parent
b275393e01
commit
80774877ff
|
@ -50,12 +50,12 @@ public abstract class FsDirectoryService extends AbstractIndexShardComponent imp
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final long throttleTimeInNanos() {
|
public long throttleTimeInNanos() {
|
||||||
return rateLimitingTimeInNanos.count();
|
return rateLimitingTimeInNanos.count();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final StoreRateLimiting rateLimiting() {
|
public StoreRateLimiting rateLimiting() {
|
||||||
return indexStore.rateLimiting();
|
return indexStore.rateLimiting();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ public abstract class FsDirectoryService extends AbstractIndexShardComponent imp
|
||||||
protected abstract Directory newFSDirectory(File location, LockFactory lockFactory) throws IOException;
|
protected abstract Directory newFSDirectory(File location, LockFactory lockFactory) throws IOException;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final void onPause(long nanos) {
|
public void onPause(long nanos) {
|
||||||
rateLimitingTimeInNanos.inc(nanos);
|
rateLimitingTimeInNanos.inc(nanos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,16 +19,22 @@
|
||||||
|
|
||||||
package org.elasticsearch.indices.settings;
|
package org.elasticsearch.indices.settings;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.common.Priority;
|
import org.elasticsearch.common.Priority;
|
||||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||||
|
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
|
||||||
|
import org.elasticsearch.index.store.support.AbstractIndexStore;
|
||||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
|
||||||
import static org.hamcrest.Matchers.equalTo;
|
import static org.hamcrest.Matchers.equalTo;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
@ -112,4 +118,120 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||||
assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3).setVersion(4), VersionConflictEngineException.class); // delete is should not be in cache
|
assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3).setVersion(4), VersionConflictEngineException.class); // delete is should not be in cache
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// #6626: make sure we can update throttle settings and the changes take effect
|
||||||
|
@Test
|
||||||
|
@Slow
|
||||||
|
public void testUpdateThrottleSettings() {
|
||||||
|
|
||||||
|
// No throttling at first, only 1 non-replicated shard, force lots of merging:
|
||||||
|
assertAcked(prepareCreate("test")
|
||||||
|
.setSettings(ImmutableSettings.builder()
|
||||||
|
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none")
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||||
|
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
||||||
|
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
|
||||||
|
));
|
||||||
|
ensureGreen();
|
||||||
|
long termUpto = 0;
|
||||||
|
for(int i=0;i<1000;i++) {
|
||||||
|
// Provoke slowish merging by making many unique terms:
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for(int j=0;j<100;j++) {
|
||||||
|
sb.append(' ');
|
||||||
|
sb.append(termUpto++);
|
||||||
|
}
|
||||||
|
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||||
|
if (i % 2 == 0) {
|
||||||
|
refresh();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// No merge IO throttling should have happened:
|
||||||
|
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||||
|
for(NodeStats stats : nodesStats.getNodes()) {
|
||||||
|
assertThat(stats.getIndices().getStore().getThrottleTime().getMillis(), equalTo(0l));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now updates settings to turn on merge throttling lowish rate
|
||||||
|
client()
|
||||||
|
.admin()
|
||||||
|
.indices()
|
||||||
|
.prepareUpdateSettings("test")
|
||||||
|
.setSettings(ImmutableSettings.builder()
|
||||||
|
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
|
||||||
|
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb"))
|
||||||
|
.get();
|
||||||
|
|
||||||
|
// Make sure setting says it is in fact changed:
|
||||||
|
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||||
|
assertThat(getSettingsResponse.getSetting("test", AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE), equalTo("merge"));
|
||||||
|
|
||||||
|
// Also make sure we see throttling kicking in:
|
||||||
|
boolean done = false;
|
||||||
|
while (done == false) {
|
||||||
|
// Provoke slowish merging by making many unique terms:
|
||||||
|
for(int i=0;i<5;i++) {
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for(int j=0;j<100;j++) {
|
||||||
|
sb.append(' ');
|
||||||
|
sb.append(termUpto++);
|
||||||
|
sb.append(" some random text that keeps repeating over and over again hambone");
|
||||||
|
}
|
||||||
|
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||||
|
}
|
||||||
|
refresh();
|
||||||
|
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||||
|
for(NodeStats stats : nodesStats.getNodes()) {
|
||||||
|
long throttleMillis = stats.getIndices().getStore().getThrottleTime().getMillis();
|
||||||
|
if (throttleMillis > 0) {
|
||||||
|
done = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish:
|
||||||
|
client().admin().indices().prepareOptimize("test").get();
|
||||||
|
|
||||||
|
// Now updates settings to disable merge throttling
|
||||||
|
client()
|
||||||
|
.admin()
|
||||||
|
.indices()
|
||||||
|
.prepareUpdateSettings("test")
|
||||||
|
.setSettings(ImmutableSettings.builder()
|
||||||
|
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none"))
|
||||||
|
.get();
|
||||||
|
|
||||||
|
// Record current throttling so far
|
||||||
|
long sumThrottleTime = 0;
|
||||||
|
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||||
|
for(NodeStats stats : nodesStats.getNodes()) {
|
||||||
|
sumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make sure no further throttling happens:
|
||||||
|
for(int i=0;i<1000;i++) {
|
||||||
|
// Provoke slowish merging by making many unique terms:
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
for(int j=0;j<100;j++) {
|
||||||
|
sb.append(' ');
|
||||||
|
sb.append(termUpto++);
|
||||||
|
}
|
||||||
|
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||||
|
if (i % 2 == 0) {
|
||||||
|
refresh();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
long newSumThrottleTime = 0;
|
||||||
|
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||||
|
for(NodeStats stats : nodesStats.getNodes()) {
|
||||||
|
newSumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis();
|
||||||
|
}
|
||||||
|
|
||||||
|
// No additional merge IO throttling should have happened:
|
||||||
|
assertEquals(sumThrottleTime, newSumThrottleTime);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1116,7 +1116,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Indexes the given {@link IndexRequestBuilder} instances randomly. It shuffles the given builders and either
|
* Indexes the given {@link IndexRequestBuilder} instances randomly. It shuffles the given builders and either
|
||||||
* indexes they in a blocking or async fashion. This is very useful to catch problems that relate to internal document
|
* indexes them in a blocking or async fashion. This is very useful to catch problems that relate to internal document
|
||||||
* ids or index segment creations. Some features might have bug when a given document is the first or the last in a
|
* ids or index segment creations. Some features might have bug when a given document is the first or the last in a
|
||||||
* segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index
|
* segment or if only one document is in a segment etc. This method prevents issues like this by randomizing the index
|
||||||
* layout.
|
* layout.
|
||||||
|
@ -1139,7 +1139,7 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
||||||
* layout.
|
* layout.
|
||||||
*
|
*
|
||||||
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed.
|
* @param forceRefresh if <tt>true</tt> all involved indices are refreshed once the documents are indexed.
|
||||||
* @param dummyDocuments if <tt>true</tt> some empty dummy documents are may be randomly inserted into the document list and deleted once
|
* @param dummyDocuments if <tt>true</tt> some empty dummy documents may be randomly inserted into the document list and deleted once
|
||||||
* all documents are indexed. This is useful to produce deleted documents on the server side.
|
* all documents are indexed. This is useful to produce deleted documents on the server side.
|
||||||
* @param builders the documents to index.
|
* @param builders the documents to index.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.base.Charsets;
|
||||||
import org.apache.lucene.index.CheckIndex;
|
import org.apache.lucene.index.CheckIndex;
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.LockFactory;
|
import org.apache.lucene.store.LockFactory;
|
||||||
|
import org.apache.lucene.store.StoreRateLimiting;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
@ -110,4 +111,19 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
||||||
logger.warn("failed to check index", e);
|
logger.warn("failed to check index", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPause(long nanos) {
|
||||||
|
delegateService.onPause(nanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StoreRateLimiting rateLimiting() {
|
||||||
|
return delegateService.rateLimiting();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long throttleTimeInNanos() {
|
||||||
|
return delegateService.throttleTimeInNanos();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue