[TEST] testAckedIndexing waits for all nodes to stabilize

testAckedIndexing now waits for all nodes to stabilize in the cluster
state through an assertBusy before final validation that all documents
are found in tehir respective shards in the cluster.  Before, what could
happen is that the ensureGreen check passes but only after that is a
ping failure from the network disruption processed by the master,
thereby rendering the cluster RED again.  This assertBusy waits up to 30
seconds for all nodes to have stabilized and all get document actions to
succeed.
This commit is contained in:
Ali Beyad 2017-01-18 13:51:25 -05:00
parent 2766b08ff4
commit cd52065871
1 changed files with 11 additions and 18 deletions

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
@ -584,28 +585,20 @@ public class DiscoveryWithServiceDisruptionsIT extends ESIntegTestCase {
}
ensureGreen("test");
// make sure all nodes have the updated cluster state with the latest routing table
final long clusterStateVersionOnMaster = internalCluster().clusterService(internalCluster().getMasterName())
.state().getVersion();
logger.info("validating successful docs");
assertBusy(() -> {
for (String node : nodes) {
assertThat(internalCluster().clusterService(node).state().getVersion(),
greaterThanOrEqualTo(clusterStateVersionOnMaster));
}
});
logger.info("validating successful docs");
for (String node : nodes) {
try {
logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size());
for (String id : ackedDocs.keySet()) {
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
try {
logger.debug("validating through node [{}] ([{}] acked docs)", node, ackedDocs.size());
for (String id : ackedDocs.keySet()) {
assertTrue("doc [" + id + "] indexed via node [" + ackedDocs.get(id) + "] not found",
client(node).prepareGet("test", "type", id).setPreference("_local").get().isExists());
}
} catch (AssertionError | NoShardAvailableActionException e) {
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
}
} catch (AssertionError e) {
throw new AssertionError(e.getMessage() + " (checked via node [" + node + "]", e);
}
}
}, 30, TimeUnit.SECONDS);
logger.info("done validating (iteration [{}])", iter);
}