Stats : Add time in index throttle to stats.

This commit adds throttle stats to the indexing stats and uses a call back from InternalEngine to manage the stats.
Also includes updates the IndexStatsTests to test for these new stats.
Stats added :
```
throttle_time_in_millis
is_throttled
```

Closes #7861
This commit is contained in:
Brian Murphy 2014-10-22 14:06:07 +01:00
parent 40945aebcf
commit 7333694830
4 changed files with 176 additions and 6 deletions

View File

@ -272,7 +272,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
try {
this.indexWriter = createWriter();
mergeScheduler.removeListener(this.throttle);
this.throttle = new IndexThrottle(mergeScheduler, logger);
this.throttle = new IndexThrottle(mergeScheduler, logger, indexingService);
mergeScheduler.addListener(throttle);
} catch (IOException e) {
maybeFailEngine(e, "start");
@ -844,7 +844,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
currentIndexWriter().close(false);
indexWriter = createWriter();
mergeScheduler.removeListener(this.throttle);
this.throttle = new IndexThrottle(mergeScheduler, this.logger);
this.throttle = new IndexThrottle(mergeScheduler, this.logger, indexingService);
mergeScheduler.addListener(throttle);
// commit on a just opened writer will commit even if there are no changes done to it
// we rely on that for the commit data translog id key
@ -1716,7 +1717,8 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
}
private static final class IndexThrottle implements MergeSchedulerProvider.Listener {
static final class IndexThrottle implements MergeSchedulerProvider.Listener {
private static final InternalLock NOOP_LOCK = new InternalLock(new NoOpLock());
private final InternalLock lockReference = new InternalLock(new ReentrantLock());
@ -1724,12 +1726,14 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
private final AtomicBoolean isThrottling = new AtomicBoolean();
private final MergeSchedulerProvider mergeScheduler;
private final ESLogger logger;
private final ShardIndexingService indexingService;
private volatile InternalLock lock = NOOP_LOCK;
public IndexThrottle(MergeSchedulerProvider mergeScheduler, ESLogger logger) {
public IndexThrottle(MergeSchedulerProvider mergeScheduler, ESLogger logger, ShardIndexingService indexingService) {
this.mergeScheduler = mergeScheduler;
this.logger = logger;
this.indexingService = indexingService;
}
public Releasable acquireThrottle() {
@ -1742,6 +1746,7 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
if (numMergesInFlight.incrementAndGet() > maxNumMerges) {
if (isThrottling.getAndSet(true) == false) {
logger.info("now throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
indexingService.throttlingActivated();
}
lock = lockReference;
}
@ -1753,10 +1758,12 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
if (numMergesInFlight.decrementAndGet() < maxNumMerges) {
if (isThrottling.getAndSet(false)) {
logger.info("stop throttling indexing: numMergesInFlight={}, maxNumMerges={}", numMergesInFlight, maxNumMerges);
indexingService.throttlingDeactivated();
}
lock = NOOP_LOCK;
}
}
}
private static final class NoOpLock implements Lock {

View File

@ -49,11 +49,14 @@ public class IndexingStats implements Streamable, ToXContent {
private long noopUpdateCount;
private long throttleTimeInMillis;
private boolean isThrottled;
Stats() {
}
public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent, long noopUpdateCount) {
public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent, long noopUpdateCount, boolean isThrottled, long throttleTimeInMillis) {
this.indexCount = indexCount;
this.indexTimeInMillis = indexTimeInMillis;
this.indexCurrent = indexCurrent;
@ -61,6 +64,8 @@ public class IndexingStats implements Streamable, ToXContent {
this.deleteTimeInMillis = deleteTimeInMillis;
this.deleteCurrent = deleteCurrent;
this.noopUpdateCount = noopUpdateCount;
this.isThrottled = isThrottled;
this.throttleTimeInMillis = throttleTimeInMillis;
}
public void add(Stats stats) {
@ -73,6 +78,10 @@ public class IndexingStats implements Streamable, ToXContent {
deleteCurrent += stats.deleteCurrent;
noopUpdateCount += stats.noopUpdateCount;
throttleTimeInMillis += stats.throttleTimeInMillis;
if (isThrottled != stats.isThrottled) {
isThrottled = true; //When combining if one is throttled set result to throttled.
}
}
public long getIndexCount() {
@ -95,6 +104,30 @@ public class IndexingStats implements Streamable, ToXContent {
return deleteCount;
}
/**
* Returns if the index is under merge throttling control
* @return
*/
public boolean isThrottled() {
return isThrottled;
}
/**
* Gets the amount of time in milliseconds that the index has been under merge throttling control
* @return
*/
public long getThrottleTimeInMillis() {
return throttleTimeInMillis;
}
/**
* Gets the amount of time in a TimeValue that the index has been under merge throttling control
* @return
*/
public TimeValue getThrottleTime() {
return new TimeValue(throttleTimeInMillis);
}
public TimeValue getDeleteTime() {
return new TimeValue(deleteTimeInMillis);
}
@ -130,6 +163,11 @@ public class IndexingStats implements Streamable, ToXContent {
if (in.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
noopUpdateCount = in.readVLong();
}
if (in.getVersion().onOrAfter(Version.V_1_4_0)) {
isThrottled = in.readBoolean();
throttleTimeInMillis = in.readLong();
}
}
@Override
@ -145,6 +183,12 @@ public class IndexingStats implements Streamable, ToXContent {
if (out.getVersion().onOrAfter(Version.V_1_4_0_Beta1)) {
out.writeVLong(noopUpdateCount);
}
if (out.getVersion().onOrAfter(Version.V_1_4_0)) {
out.writeBoolean(isThrottled);
out.writeLong(throttleTimeInMillis);
}
}
@Override
@ -159,6 +203,8 @@ public class IndexingStats implements Streamable, ToXContent {
builder.field(Fields.NOOP_UPDATE_TOTAL, noopUpdateCount);
builder.field(Fields.IS_THROTTLED, isThrottled);
builder.timeValueField(Fields.THROTTLED_TIME_IN_MILLIS, Fields.THROTTLED_TIME, throttleTimeInMillis);
return builder;
}
}
@ -239,6 +285,9 @@ public class IndexingStats implements Streamable, ToXContent {
static final XContentBuilderString DELETE_TIME_IN_MILLIS = new XContentBuilderString("delete_time_in_millis");
static final XContentBuilderString DELETE_CURRENT = new XContentBuilderString("delete_current");
static final XContentBuilderString NOOP_UPDATE_TOTAL = new XContentBuilderString("noop_update_total");
static final XContentBuilderString IS_THROTTLED = new XContentBuilderString("is_throttled");
static final XContentBuilderString THROTTLED_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
static final XContentBuilderString THROTTLED_TIME = new XContentBuilderString("throttle_time");
}
public static IndexingStats readIndexingStats(StreamInput in) throws IOException {

View File

@ -107,6 +107,14 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
}
}
public void throttlingActivated() {
totalStats.setThrottled(true);
}
public void throttlingDeactivated() {
totalStats.setThrottled(false);
}
public void postCreate(Engine.Create create) {
long took = create.endTime() - create.startTime();
totalStats.indexMetric.inc(took);
@ -259,12 +267,38 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
public final CounterMetric indexCurrent = new CounterMetric();
public final CounterMetric deleteCurrent = new CounterMetric();
public final CounterMetric noopUpdates = new CounterMetric();
public final CounterMetric throttleTimeMillisMetric = new CounterMetric();
volatile boolean isThrottled = false;
volatile long startOfThrottleMillis;
public IndexingStats.Stats stats() {
long currentThrottleMillis = 0;
if (isThrottled && startOfThrottleMillis != 0) {
currentThrottleMillis += System.currentTimeMillis() - startOfThrottleMillis;
if (currentThrottleMillis < 0) {
//Timeslip must have happened, have to ignore this value
currentThrottleMillis = 0;
}
}
return new IndexingStats.Stats(
indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(),
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(),
noopUpdates.count());
noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(throttleTimeMillisMetric.count() + currentThrottleMillis));
}
void setThrottled(boolean isThrottled) {
if (!this.isThrottled && isThrottled) {
startOfThrottleMillis = System.currentTimeMillis();
} else if (this.isThrottled && !isThrottled) {
assert startOfThrottleMillis > 0 : "Bad state of startOfThrottleMillis";
long throttleTimeMillis = System.currentTimeMillis() - startOfThrottleMillis;
if (throttleTimeMillis >= 0) {
//A timeslip may have occured but never want to add a negative number
throttleTimeMillisMetric.inc(throttleTimeMillis);
}
}
this.isThrottled = isThrottled;
}
public long totalCurrent() {
@ -275,5 +309,7 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
indexMetric.clear();
deleteMetric.clear();
}
}
}

View File

@ -33,7 +33,10 @@ import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.merge.policy.TieredMergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.ConcurrentMergeSchedulerProvider;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
@ -281,6 +284,79 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l));
}
@Test
public void nonThrottleStats() throws Exception {
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
));
ensureGreen();
long termUpto = 0;
IndicesStatsResponse stats;
// Provoke slowish merging by making many unique terms:
for(int i=0; i<100; i++) {
StringBuilder sb = new StringBuilder();
for(int j=0; j<100; j++) {
sb.append(' ');
sb.append(termUpto++);
sb.append(" some random text that keeps repeating over and over again hambone");
}
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
}
refresh();
stats = client().admin().indices().prepareStats().execute().actionGet();
//nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
stats = client().admin().indices().prepareStats().execute().actionGet();
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis(), equalTo(0l));
}
@Test
public void throttleStats() throws Exception {
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "1")
));
ensureGreen();
long termUpto = 0;
IndicesStatsResponse stats;
// make sure we see throttling kicking in:
boolean done = false;
while (!done) {
for(int i=0; i<100; i++) {
// Provoke slowish merging by making many unique terms:
StringBuilder sb = new StringBuilder();
for(int j=0; j<100; j++) {
sb.append(' ');
sb.append(termUpto++);
}
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
if (i % 2 == 0) {
refresh();
}
}
refresh();
stats = client().admin().indices().prepareStats().execute().actionGet();
//nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
done = stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis() > 0;
}
stats = client().admin().indices().prepareStats().execute().actionGet();
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis(), greaterThan(0l));
}
@Test
public void simpleStats() throws Exception {
createIndex("test1", "test2");
@ -302,6 +378,8 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
assertThat(stats.getPrimaries().getDocs().getCount(), equalTo(3l));
assertThat(stats.getTotal().getDocs().getCount(), equalTo(totalExpectedWrites));
assertThat(stats.getPrimaries().getIndexing().getTotal().getIndexCount(), equalTo(3l));
assertThat(stats.getPrimaries().getIndexing().getTotal().isThrottled(), equalTo(false));
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis(), equalTo(0l));
assertThat(stats.getTotal().getIndexing().getTotal().getIndexCount(), equalTo(totalExpectedWrites));
assertThat(stats.getTotal().getStore(), notNullValue());
assertThat(stats.getTotal().getMerge(), notNullValue());