[Tests] SimpleRecoveryLocalGatewayTests.testSingleNodeNoFlush could fail if shards were not started
The test starts a single node, indexes into, restarts the node and checks that no data was lost. It only indexed into 2 shards and didn't wait for green meaning that the node could be restarted with non-started primary. In that case the node will not re-assign the primary as it was not started. This commit makes sure that we either wait for primaries to start or index into all shards which has the same net effect. Also extending some logging in InternalIndexShard.
This commit is contained in:
parent
909cf4de44
commit
baea1827d1
|
@ -378,7 +378,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
writeAllowed(create.origin());
|
writeAllowed(create.origin());
|
||||||
create = indexingService.preCreate(create);
|
create = indexingService.preCreate(create);
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("index {}", create.docs());
|
logger.trace("index [{}][{}]{}", create.type(), create.id(), create.docs());
|
||||||
}
|
}
|
||||||
engine.create(create);
|
engine.create(create);
|
||||||
create.endTime(System.nanoTime());
|
create.endTime(System.nanoTime());
|
||||||
|
@ -400,7 +400,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
index = indexingService.preIndex(index);
|
index = indexingService.preIndex(index);
|
||||||
try {
|
try {
|
||||||
if (logger.isTraceEnabled()) {
|
if (logger.isTraceEnabled()) {
|
||||||
logger.trace("index {}", index.docs());
|
logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs());
|
||||||
}
|
}
|
||||||
engine.index(index);
|
engine.index(index);
|
||||||
index.endTime(System.nanoTime());
|
index.endTime(System.nanoTime());
|
||||||
|
|
|
@ -38,6 +38,8 @@ import org.elasticsearch.test.TestCluster.RestartCallback;
|
||||||
import org.elasticsearch.test.store.MockDirectoryHelper;
|
import org.elasticsearch.test.store.MockDirectoryHelper;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||||
|
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||||
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
|
||||||
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
|
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
|
||||||
|
@ -111,32 +113,68 @@ public class SimpleRecoveryLocalGatewayTests extends ElasticsearchIntegrationTes
|
||||||
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
|
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type1")
|
||||||
.startObject("properties").startObject("field").field("type", "string").endObject().startObject("num").field("type", "integer").endObject().endObject()
|
.startObject("properties").startObject("field").field("type", "string").endObject().startObject("num").field("type", "integer").endObject().endObject()
|
||||||
.endObject().endObject().string();
|
.endObject().endObject().string();
|
||||||
assertAcked(prepareCreate("test").addMapping("type1", mapping));
|
// note: default replica settings are tied to #data nodes-1 which is 0 here. We can do with 1 in this test.
|
||||||
|
int numberOfShards = numberOfShards();
|
||||||
|
assertAcked(prepareCreate("test").setSettings(
|
||||||
|
SETTING_NUMBER_OF_SHARDS, numberOfShards(),
|
||||||
|
SETTING_NUMBER_OF_REPLICAS, randomIntBetween(0, 1)
|
||||||
|
).addMapping("type1", mapping));
|
||||||
|
|
||||||
|
int value1Docs;
|
||||||
|
int value2Docs;
|
||||||
|
boolean indexToAllShards = randomBoolean();
|
||||||
|
|
||||||
|
if (indexToAllShards) {
|
||||||
|
// insert enough docs so all shards will have a doc
|
||||||
|
value1Docs = randomIntBetween(numberOfShards * 10, numberOfShards * 20);
|
||||||
|
value2Docs = randomIntBetween(numberOfShards * 10, numberOfShards * 20);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// insert a two docs, some shards will not have anything
|
||||||
|
value1Docs = 1;
|
||||||
|
value2Docs = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i < 1 + randomInt(100); i++) {
|
||||||
|
for (int id = 0; id < Math.max(value1Docs, value2Docs); id++) {
|
||||||
|
if (id < value1Docs) {
|
||||||
|
index("test", "type1", "1_" + id,
|
||||||
|
jsonBuilder().startObject().field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if (id < value2Docs) {
|
||||||
|
index("test", "type1", "2_" + id,
|
||||||
|
jsonBuilder().startObject().field("field", "value2").startArray("num").value(14).endArray().endObject()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < 100; i++) {
|
|
||||||
client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("_id", "1").field("field", "value1").startArray("num").value(14).value(179).endArray().endObject()).execute().actionGet();
|
|
||||||
client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("_id", "2").field("field", "value2").startArray("num").value(14).endArray().endObject()).execute().actionGet();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
refresh();
|
refresh();
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i <= randomInt(10); i++) {
|
||||||
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
|
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), value1Docs + value2Docs);
|
||||||
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1);
|
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).get(), value1Docs);
|
||||||
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1);
|
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).get(), value2Docs);
|
||||||
assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1);
|
assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).get(), value1Docs);
|
||||||
|
}
|
||||||
|
if (!indexToAllShards) {
|
||||||
|
// we have to verify primaries are started for them to be restored
|
||||||
|
logger.info("Ensure all primaries have been started");
|
||||||
|
ensureYellow();
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster().fullRestart();
|
cluster().fullRestart();
|
||||||
|
|
||||||
logger.info("Running Cluster Health (wait for the shards to startup)");
|
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||||
ensureYellow();
|
ensureYellow();
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i <= randomInt(10); i++) {
|
||||||
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
|
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), value1Docs + value2Docs);
|
||||||
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1);
|
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).get(), value1Docs);
|
||||||
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1);
|
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).get(), value2Docs);
|
||||||
assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1);
|
assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).get(), value1Docs);
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster().fullRestart();
|
cluster().fullRestart();
|
||||||
|
@ -145,11 +183,11 @@ public class SimpleRecoveryLocalGatewayTests extends ElasticsearchIntegrationTes
|
||||||
logger.info("Running Cluster Health (wait for the shards to startup)");
|
logger.info("Running Cluster Health (wait for the shards to startup)");
|
||||||
ensureYellow();
|
ensureYellow();
|
||||||
|
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i <= randomInt(10); i++) {
|
||||||
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).execute().actionGet(), 2);
|
assertHitCount(client().prepareCount().setQuery(matchAllQuery()).get(), value1Docs + value2Docs);
|
||||||
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).execute().actionGet(), 1);
|
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value1")).get(), value1Docs);
|
||||||
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).execute().actionGet(), 1);
|
assertHitCount(client().prepareCount().setQuery(termQuery("field", "value2")).get(), value2Docs);
|
||||||
assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).execute().actionGet(), 1);
|
assertHitCount(client().prepareCount().setQuery(termQuery("num", 179)).get(), value1Docs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue