[Test] ensureStableCluster failed to pass viaNode parameter correctly

Also improved timeouts & logs
This commit is contained in:
Boaz Leskes 2014-06-24 11:33:43 +02:00
parent f7b962a417
commit a7a61a0392
2 changed files with 25 additions and 8 deletions

View File

@ -272,7 +272,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
networkPartition.stopDisrupting();
// Wait until the master node sees all 3 nodes again.
ensureStableCluster(3);
ensureStableCluster(3, new TimeValue(30000 + networkPartition.expectedTimeToHeal().millis()));
logger.info("verifying all nodes return all data");
for (Client client : clients()) {
@ -329,7 +329,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
// restore isolation
scheme.stopDisrupting();
ensureStableCluster(3);
ensureStableCluster(3, new TimeValue(30000 + scheme.expectedTimeToHeal().millis()));
logger.info("issue a reroute");
// trigger a reroute now, instead of waiting for the background reroute of RerouteService
@ -415,13 +415,15 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertThat(response.getVersion(), equalTo(1l));
ackedDocs.put(id, node);
logger.trace("[{}] indexed id [{}] through node [{}]", name, id, node);
} catch (ElasticsearchException e) {
exceptedExceptions.add(e);
logger.trace("[{}] failed id [{}] through node [{}]", e, name, id, node);
} finally {
countDownLatchRef.get().countDown();
logger.trace("[{}] decreased counter : {}", name, countDownLatchRef.get().getCount());
}
} catch (ElasticsearchException | InterruptedException e) {
exceptedExceptions.add(e);
logger.trace("[{}] failed id [{}] through node [{}]", e, name, id, node);
} catch (InterruptedException e) {
// fine - semaphore interrupt
} catch (Throwable t) {
logger.info("unexpected exception in background thread of [{}]", t, node);
}
@ -455,7 +457,7 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
assertThat(semaphore.availablePermits(), equalTo(0));
semaphore.release(docsPerIndexer);
}
assertTrue(countDownLatchRef.get().await(30000 + disruptionScheme.expectedTimeToHeal().millis() * (docsPerIndexer * indexers.size()), TimeUnit.MILLISECONDS));
assertTrue(countDownLatchRef.get().await(60000 + disruptionScheme.expectedTimeToHeal().millis() * (docsPerIndexer * indexers.size()), TimeUnit.MILLISECONDS));
logger.info("stopping disruption");
disruptionScheme.stopDisrupting();
@ -478,7 +480,13 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
logger.info("done validating (iteration [{}])", iter);
}
} finally {
logger.debug("Excepted exception during disruption [{}]", exceptedExceptions);
if (exceptedExceptions.size() > 0) {
StringBuilder sb = new StringBuilder("Indexing exceptions during disruption:");
for (Exception e : exceptedExceptions) {
sb.append("\n").append(e.getMessage());
}
logger.debug(sb.toString());
}
logger.info("shutting down indexers");
stop.set(true);
for (Thread indexer : indexers) {
@ -608,10 +616,11 @@ public class DiscoveryWithNetworkFailuresTests extends ElasticsearchIntegrationT
}
private void ensureStableCluster(int nodeCount, @Nullable String viaNode) {
ensureStableCluster(nodeCount, TimeValue.timeValueSeconds(30), null);
ensureStableCluster(nodeCount, TimeValue.timeValueSeconds(30), viaNode);
}
private void ensureStableCluster(int nodeCount, TimeValue timeValue, @Nullable String viaNode) {
logger.debug("ensuring cluster is stable with [{}] nodes. access node: [{}]. timeout: [{}]", nodeCount, viaNode, timeValue);
ClusterHealthResponse clusterHealthResponse = client(viaNode).admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID)
.setWaitForNodes(Integer.toString(nodeCount))

View File

@ -836,7 +836,15 @@ public final class InternalTestCluster extends TestCluster {
}
private synchronized void reset(boolean wipeData) throws IOException {
TimeValue expectedHealingTime = activeDisruptionScheme != null ? activeDisruptionScheme.expectedTimeToHeal() : null;
clearDisruptionScheme();
if (expectedHealingTime != null && expectedHealingTime.millis() > 0) {
try {
Thread.sleep(expectedHealingTime.millis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
randomlyResetClients();
if (wipeData) {
wipeDataDirectories();