SecurityTribeIT - only wait for number nodes if they are not already there.

Original commit: elastic/x-pack-elasticsearch@3fa5da519a
This commit is contained in:
Boaz Leskes 2016-10-19 14:30:09 +02:00
parent baf1596418
commit 29e35267c3
1 changed files with 27 additions and 23 deletions

View File

@ -43,6 +43,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
@ -167,34 +168,37 @@ public class SecurityTribeIT extends NativeRealmIntegTestCase {
tribeClient = getClientWrapper().apply(tribeNode.client()); tribeClient = getClientWrapper().apply(tribeNode.client());
ClusterStateObserver observer = new ClusterStateObserver(tribeNode.injector().getInstance(ClusterService.class), ClusterStateObserver observer = new ClusterStateObserver(tribeNode.injector().getInstance(ClusterService.class),
logger, new ThreadContext(settings)); logger, new ThreadContext(settings));
CountDownLatch latch = new CountDownLatch(1);
final int cluster1Nodes = internalCluster().size(); final int cluster1Nodes = internalCluster().size();
final int cluster2Nodes = cluster2.size(); final int cluster2Nodes = cluster2.size();
logger.info("waiting for [{}] nodes to be added to the tribe cluster state", cluster1Nodes + cluster2Nodes + 2); logger.info("waiting for [{}] nodes to be added to the tribe cluster state", cluster1Nodes + cluster2Nodes + 2);
observer.waitForNextChange(new ClusterStateObserver.Listener() { final Predicate<ClusterState> nodeCountPredicate = state -> state.nodes().getSize() == cluster1Nodes + cluster2Nodes + 3;
@Override if (nodeCountPredicate.test(observer.observedState()) == false) {
public void onNewClusterState(ClusterState state) { CountDownLatch latch = new CountDownLatch(1);
latch.countDown(); observer.waitForNextChange(new ClusterStateObserver.Listener() {
} @Override
public void onNewClusterState(ClusterState state) {
latch.countDown();
}
@Override @Override
public void onClusterServiceClose() { public void onClusterServiceClose() {
fail("tribe cluster service closed"); fail("tribe cluster service closed");
latch.countDown(); latch.countDown();
} }
@Override @Override
public void onTimeout(TimeValue timeout) { public void onTimeout(TimeValue timeout) {
fail("timed out waiting for nodes to be added to tribe's cluster state"); fail("timed out waiting for nodes to be added to tribe's cluster state");
latch.countDown(); latch.countDown();
} }
}, new ClusterStateObserver.ValidationPredicate() { }, new ClusterStateObserver.ValidationPredicate() {
@Override @Override
protected boolean validate(ClusterState newState) { protected boolean validate(ClusterState newState) {
return newState.nodes().getSize() == cluster1Nodes + cluster2Nodes + 3; return nodeCountPredicate.test(newState);
} }
}); });
latch.await(); latch.await();
}
} }
public void testThatTribeCanAuthenticateElasticUser() throws Exception { public void testThatTribeCanAuthenticateElasticUser() throws Exception {