Only check if ttl purger has deleted the docs when the delete count in indexing stats has been increased.
This commit is contained in:
parent
3d0382b562
commit
8fa54b59fb
|
@ -2,6 +2,7 @@ package org.elasticsearch.test.integration.percolator;
|
|||
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.percolate.PercolateResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.client.Requests;
|
||||
|
@ -15,7 +16,6 @@ import org.junit.Test;
|
|||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||
import static org.elasticsearch.test.integration.percolator.SimplePercolatorTests.convertFromTextArray;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
/**
|
||||
|
@ -68,14 +68,28 @@ public class TTLPercolatorTests extends AbstractNodesTests {
|
|||
.endObject()
|
||||
).execute().actionGet();
|
||||
assertThat(convertFromTextArray(percolateResponse.getMatches()), arrayContaining("kuku"));
|
||||
|
||||
long timeSpent = System.currentTimeMillis() - now;
|
||||
long waitTime = ttl + purgeInterval + 200;
|
||||
if (timeSpent <= waitTime) {
|
||||
long timeToWait = waitTime - timeSpent;
|
||||
logger.info("Waiting {} ms for ttl purging...", timeToWait);
|
||||
Thread.sleep(timeToWait);
|
||||
long waitTime = ttl + purgeInterval - timeSpent;
|
||||
if (waitTime >= 0) {
|
||||
Thread.sleep(waitTime); // Doesn't make sense to check the deleteCount before ttl has expired
|
||||
}
|
||||
|
||||
// See comment in SimpleTTLTests
|
||||
logger.info("Checking if the ttl purger has run");
|
||||
long currentDeleteCount;
|
||||
do {
|
||||
if (rarely()) {
|
||||
client.admin().indices().prepareFlush("test").setFull(true).execute().actionGet();
|
||||
} else if (rarely()) {
|
||||
client.admin().indices().prepareOptimize("test").setMaxNumSegments(1).execute().actionGet();
|
||||
}
|
||||
IndicesStatsResponse response = client.admin().indices().prepareStats("test")
|
||||
.clear().setIndexing(true)
|
||||
.execute().actionGet();
|
||||
currentDeleteCount = response.getIndices().get("test").getTotal().getIndexing().getTotal().getDeleteCount();
|
||||
} while (currentDeleteCount < 2); // TTL deletes one doc, but it is indexed in the primary shard and replica shard.
|
||||
assertThat(currentDeleteCount, equalTo(2l));
|
||||
|
||||
percolateResponse = client.preparePercolate("test", "type1").setSource(jsonBuilder()
|
||||
.startObject()
|
||||
.startObject("doc")
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.test.integration.ttl;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Priority;
|
||||
|
@ -28,7 +29,6 @@ import org.elasticsearch.test.integration.AbstractNodesTests;
|
|||
import org.junit.Test;
|
||||
|
||||
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.*;
|
||||
|
||||
public class SimpleTTLTests extends AbstractNodesTests {
|
||||
|
@ -116,13 +116,32 @@ public class SimpleTTLTests extends AbstractNodesTests {
|
|||
ttl0 = ((Number) getResponse.getField("_ttl").getValue()).longValue();
|
||||
assertThat(ttl0, greaterThan(0L));
|
||||
|
||||
logger.info("--> checking purger");
|
||||
// make sure the purger has done its job for all indexed docs that are expired
|
||||
long shouldBeExpiredDate = now + providedTTLValue + purgeInterval + 2000;
|
||||
now1 = System.currentTimeMillis();
|
||||
if (shouldBeExpiredDate - now1 > 0) {
|
||||
Thread.sleep(shouldBeExpiredDate - now1);
|
||||
}
|
||||
|
||||
// We can't assume that after waiting for ttl + purgeInterval (waitTime) that the document have actually been deleted.
|
||||
// The ttl purging happens in the background in a different thread, and might not have been completed after waiting for waitTime.
|
||||
// But we can use index statistics' delete count to be sure that deletes have been executed, that must be incremented before
|
||||
// ttl purging has finished.
|
||||
logger.info("--> checking purger");
|
||||
long currentDeleteCount;
|
||||
do {
|
||||
if (rarely()) {
|
||||
client.admin().indices().prepareFlush("test").setFull(true).execute().actionGet();
|
||||
} else if (rarely()) {
|
||||
client.admin().indices().prepareOptimize("test").setMaxNumSegments(1).execute().actionGet();
|
||||
}
|
||||
IndicesStatsResponse response = client.admin().indices().prepareStats("test")
|
||||
.clear().setIndexing(true)
|
||||
.execute().actionGet();
|
||||
currentDeleteCount = response.getIndices().get("test").getTotal().getIndexing().getTotal().getDeleteCount();
|
||||
} while (currentDeleteCount < 4); // TTL deletes two docs, but it is indexed in the primary shard and replica shard.
|
||||
assertThat(currentDeleteCount, equalTo(4l));
|
||||
|
||||
// realtime get check
|
||||
getResponse = client.prepareGet("test", "type1", "1").setFields("_ttl").setRealtime(true).execute().actionGet();
|
||||
assertThat(getResponse.isExists(), equalTo(false));
|
||||
|
|
Loading…
Reference in New Issue