SOLR-14322 Improve AbstractFullDistribZkTestBase.waitForThingsToLevelOut

This commit is contained in:
Mike Drob 2020-03-11 14:32:01 -05:00
parent ea864b43a3
commit a31ecd2eb8
29 changed files with 112 additions and 104 deletions

View File

@ -47,6 +47,9 @@ Other Changes
* SOLR-14012: Return long value for unique and hll aggregations irrespective of shard count (Munendra S N, hossman)
* SOLR-14322: AbstractFullDistribZkTestBase.waitForRecoveriesToFinish now takes a timeout and time unit instead of
assuming that we are passed value in seconds. (Mike Drob)
================== 8.6.0 ==================
Consult the LUCENE_CHANGES.txt file for additional, low level, changes in this release.

View File

@ -18,6 +18,7 @@ package org.apache.solr.cloud;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.mockfile.FilterPath;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@ -388,7 +389,7 @@ public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
// make sure we have published we are recovering
Thread.sleep(1500);
waitForThingsToLevelOut(60);
waitForThingsToLevelOut(1, TimeUnit.MINUTES);
Thread.sleep(500);

View File

@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@ -231,7 +232,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
Thread.sleep(2000);
// wait until there are no recoveries...
waitForThingsToLevelOut(Integer.MAX_VALUE);//Math.round((runLength / 1000.0f / 3.0f)));
waitForThingsToLevelOut();
// make sure we again have leaders for each shard
for (int j = 1; j < sliceCount; j++) {
@ -255,7 +256,7 @@ public class ChaosMonkeyNothingIsSafeTest extends AbstractFullDistribZkTestBase
}
waitForThingsToLevelOut(20);
waitForThingsToLevelOut(20, TimeUnit.SECONDS);
commit();

View File

@ -250,7 +250,7 @@ public class ChaosMonkeyNothingIsSafeWithPullReplicasTest extends AbstractFullDi
ChaosMonkey.wait(2000, DEFAULT_COLLECTION, zkStateReader);
// wait until there are no recoveries...
waitForThingsToLevelOut(Integer.MAX_VALUE);//Math.round((runLength / 1000.0f / 3.0f)));
waitForThingsToLevelOut();
// make sure we again have leaders for each shard
for (int j = 1; j < sliceCount; j++) {

View File

@ -153,14 +153,14 @@ public class ChaosMonkeySafeLeaderTest extends AbstractFullDistribZkTestBase {
Thread.sleep(2000);
waitForThingsToLevelOut(180000);
waitForThingsToLevelOut(3, TimeUnit.MINUTES);
// even if things were leveled out, a jetty may have just been stopped or something
// we wait again and wait to level out again to make sure the system is not still in flux
Thread.sleep(3000);
waitForThingsToLevelOut(180000);
waitForThingsToLevelOut(3, TimeUnit.MINUTES);
checkShardConsistency(batchSize == 1, true);

View File

@ -195,14 +195,14 @@ public class ChaosMonkeySafeLeaderWithPullReplicasTest extends AbstractFullDistr
Thread.sleep(2000);
waitForThingsToLevelOut(180000);
waitForThingsToLevelOut(3, TimeUnit.MINUTES);
// even if things were leveled out, a jetty may have just been stopped or something
// we wait again and wait to level out again to make sure the system is not still in flux
Thread.sleep(3000);
waitForThingsToLevelOut(180000);
waitForThingsToLevelOut(3, TimeUnit.MINUTES);
log.info("control docs:" + controlClient.query(new SolrQuery("*:*")).getResults().getNumFound() + "\n\n");

View File

@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.util.LuceneTestCase.Slow;
@ -68,7 +69,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
@Test
public void test() throws Exception {
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
ClusterState clusterState = cloudClient.getZkStateReader().getClusterState();
final DocRouter router = clusterState.getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION).getRouter();
@ -113,7 +114,7 @@ public class ChaosMonkeyShardSplitTest extends ShardSplitTest {
Thread.sleep(2000);
waitForThingsToLevelOut(90);
waitForThingsToLevelOut(90, TimeUnit.SECONDS);
Thread.sleep(1000);
checkShardConsistency(false, true);

View File

@ -127,14 +127,14 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
@Test
// commented out on: 24-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
public void test() throws Exception {
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
testDoRecoveryOnRestart();
// test a 1x2 collection
testRf2();
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
// now do similar for a 1x3 collection while taking 2 replicas on-and-off
if (TEST_NIGHTLY) {
@ -142,12 +142,12 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase {
testRf3();
}
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
// have the leader lose its Zk session temporarily
testLeaderZkSessionLoss();
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
log.info("HttpPartitionTest succeeded ... shutting down now!");
}

View File

@ -52,7 +52,7 @@ public class LeaderFailoverAfterPartitionTest extends HttpPartitionTest {
@Test
//28-June-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
public void test() throws Exception {
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
// kill a leader and make sure recovery occurs as expected
testRf3WithLeaderFailover();

View File

@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
@ -48,7 +49,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
*
@ -119,7 +119,7 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
forceNodeFailures(singletonList(freshNode));
del("*:*");
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
checkShardConsistency(false, true);
@ -129,7 +129,7 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
"document number " + docId++);
}
commit();
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
checkShardConsistency(false, true);
@ -154,7 +154,7 @@ public class LeaderFailureAfterFreshStartTest extends AbstractFullDistribZkTestB
// shutdown the original leader
log.info("Now shutting down initial leader");
forceNodeFailures(singletonList(initialLeaderJetty));
waitForNewLeader(cloudClient, "shard1", (Replica)initialLeaderJetty.client.info , new TimeOut(15, SECONDS, TimeSource.NANO_TIME));
waitForNewLeader(cloudClient, "shard1", (Replica)initialLeaderJetty.client.info , new TimeOut(15, TimeUnit.SECONDS, TimeSource.NANO_TIME));
waitTillNodesActive();
log.info("Updating mappings from zk");
updateMappingsFromZk(jettys, clients, true);

View File

@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.codahale.metrics.Counter;
@ -56,7 +57,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* Test PeerSync when a node restarts and documents are indexed when node was down.
@ -113,7 +113,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
del("*:*");
@ -123,7 +123,7 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
"document number " + docId++);
}
commit();
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
try {
checkShardConsistency(false, true);
@ -156,14 +156,14 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
// now shutdown all other nodes except for 'nodeShutDownForFailure'
otherJetties.remove(nodePeerSynced);
forceNodeFailures(otherJetties);
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
checkShardConsistency(false, true);
// now shutdown the original leader
log.info("Now shutting down initial leader");
forceNodeFailures(singletonList(initialLeaderJetty));
log.info("Updating mappings from zk");
waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, new TimeOut(15, SECONDS, TimeSource.NANO_TIME));
waitForNewLeader(cloudClient, "shard1", (Replica) initialLeaderJetty.client.info, new TimeOut(15, TimeUnit.SECONDS, TimeSource.NANO_TIME));
updateMappingsFromZk(jettys, clients, true);
assertEquals("PeerSynced node did not become leader", nodePeerSynced, shardToLeaderJetty.get("shard1"));
@ -308,14 +308,14 @@ public class PeerSyncReplicationTest extends AbstractFullDistribZkTestBase {
nodesDown.remove(nodeToBringUp);
waitTillNodesActive();
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
Set<CloudJettyRunner> jetties = new HashSet<>();
jetties.addAll(shardToJetty.get("shard1"));
jetties.removeAll(nodesDown);
assertEquals(getShardCount() - nodesDown.size(), jetties.size());
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
iib.join();

View File

@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.StringUtils;
@ -80,19 +81,19 @@ public class ReplicationFactorTest extends AbstractFullDistribZkTestBase {
// commented out on: 24-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 20-Jul-2018
public void test() throws Exception {
log.info("replication factor test running");
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
// test a 1x3 collection
log.info("Testing replication factor handling for repfacttest_c8n_1x3");
testRf3();
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
// test handling when not using direct updates
log.info("Now testing replication factor handling for repfacttest_c8n_2x2");
testRf2NotUsingDirectUpdates();
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
log.info("replication factor testing complete! final clusterState is: "+
cloudClient.getZkStateReader().getClusterState());
}

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.lucene.util.LuceneTestCase.Slow;
@ -138,11 +139,11 @@ public class RestartWhileUpdatingTest extends AbstractFullDistribZkTestBase {
Thread.sleep(1000);
waitForThingsToLevelOut(320);
waitForThingsToLevelOut(320, TimeUnit.SECONDS);
Thread.sleep(2000);
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
Thread.sleep(5000);

View File

@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.cli.CommandLine;
import org.apache.http.HttpEntity;
@ -68,7 +69,7 @@ public class SolrCloudExampleTest extends AbstractFullDistribZkTestBase {
@Test
// 12-Jun-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 04-May-2018
public void testLoadDocsIntoGettingStartedCollection() throws Exception {
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
log.info("testLoadDocsIntoGettingStartedCollection initialized OK ... running test logic");

View File

@ -39,6 +39,7 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* Test sync phase that occurs when Leader goes down and a new Leader is
@ -68,7 +69,7 @@ public class SyncSliceTest extends AbstractFullDistribZkTestBase {
handle.clear();
handle.put("timestamp", SKIPVAL);
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
del("*:*");
List<CloudJettyRunner> skipServers = new ArrayList<>();
@ -113,7 +114,7 @@ public class SyncSliceTest extends AbstractFullDistribZkTestBase {
baseClient.request(request);
}
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
checkShardConsistency(false, true);

View File

@ -25,6 +25,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.TestUtil;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@ -114,7 +115,7 @@ public class TestCloudPivotFacet extends AbstractFullDistribZkTestBase {
//commented 2-Aug-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 28-June-2018
public void test() throws Exception {
waitForThingsToLevelOut(30000); // TODO: why would we have to wait?
waitForThingsToLevelOut(30, TimeUnit.SECONDS); // TODO: why would we have to wait?
//
handle.clear();
handle.put("QTime", SKIPVAL);

View File

@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.solr.BaseDistributedSearchTestCase;
@ -53,7 +54,7 @@ public class TestDynamicFieldNamesIndexCorrectly extends AbstractFullDistribZkTe
@Test
@BaseDistributedSearchTestCase.ShardsFixed(num = 3)
public void test() throws Exception {
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
createCollection(COLLECTION, "conf1", 4, 1, 4);
final int numRuns = 10;

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.embedded.JettySolrRunner;
@ -59,7 +60,7 @@ public class TestOnReconnectListenerSupport extends AbstractFullDistribZkTestBas
@Test
public void test() throws Exception {
waitForThingsToLevelOut(30000);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
String testCollectionName = "c8n_onreconnect_1x1";
String shardId = "shard1";

View File

@ -24,6 +24,7 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Counter;
@ -59,7 +60,7 @@ public class TestRandomRequestDistribution extends AbstractFullDistribZkTestBase
@Test
@BaseDistributedSearchTestCase.ShardsFixed(num = 3)
public void test() throws Exception {
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
for (CloudJettyRunner cloudJetty : cloudJettys) {
nodeNames.add(cloudJetty.nodeName);

View File

@ -136,7 +136,7 @@ public class TestSolrCloudWithKerberosAlt extends SolrCloudTestCase {
CollectionAdminRequest.deleteCollection(collectionName).process(client);
AbstractDistribZkTestBase.waitForCollectionToDisappear
(collectionName, client.getZkStateReader(), true, true, 330);
(collectionName, client.getZkStateReader(), true, 330);
}
@Override

View File

@ -26,6 +26,7 @@ import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.math3.primes.Primes;
@ -470,7 +471,7 @@ public class TestStressInPlaceUpdates extends AbstractFullDistribZkTestBase {
// what we can do however, is commit all completed updates, and *then* compare solr search results
// against the (new) committed model....
waitForThingsToLevelOut(30); // NOTE: this does an automatic commit for us & ensures replicas are up to date
waitForThingsToLevelOut(30, TimeUnit.SECONDS); // NOTE: this does an automatic commit for us & ensures replicas are up to date
committedModel = new HashMap<>(model);
// first, prune the model of any docs that have negative versions

View File

@ -19,6 +19,7 @@ package org.apache.solr.cloud;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.util.LuceneTestCase.Nightly;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
@ -94,7 +95,7 @@ public class TlogReplayBufferedWhileIndexingTest extends AbstractFullDistribZkTe
Thread.sleep(45000);
waitForThingsToLevelOut(600); // we can insert random update delays, so this can take a while, especially when beasting this test
waitForThingsToLevelOut(); // we can insert random update delays, so this can take a while, especially when beasting this test
Thread.sleep(2000);
@ -104,7 +105,7 @@ public class TlogReplayBufferedWhileIndexingTest extends AbstractFullDistribZkTe
thread.safeStop();
}
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
checkShardConsistency(false, false);

View File

@ -100,7 +100,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
@Nightly
public void test() throws Exception {
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
if (usually()) {
log.info("Using legacyCloud=false for cluster");
@ -133,7 +133,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
private void doSplitStaticIndexReplication(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
DocCollection defCol = cloudClient.getZkStateReader().getClusterState().getCollection(AbstractDistribZkTestBase.DEFAULT_COLLECTION);
Replica replica = defCol.getReplicas().get(0);
@ -283,7 +283,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
//05-Jul-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
// commented out on: 24-Dec-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 15-Sep-2018
public void testSplitAfterFailedSplit() throws Exception {
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
TestInjection.splitFailureBeforeReplicaCreation = "true:100"; // we definitely want split to fail
try {
@ -332,7 +332,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
@Test
@Nightly
public void testSplitAfterFailedSplit2() throws Exception {
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
TestInjection.splitFailureAfterReplicaCreation = "true:100"; // we definitely want split to fail
try {
@ -355,7 +355,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
private void doSplitMixedReplicaTypes(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
String collectionName = "testSplitMixedReplicaTypes_" + splitMethod.toLower();
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2, 0, 2); // TODO tlog replicas disabled right now.
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
@ -374,7 +374,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
splitShard.setShardName(SHARD1);
splitShard.setSplitMethod(splitMethod.toLower());
CollectionAdminResponse rsp = splitShard.process(cloudClient);
waitForThingsToLevelOut(30);
waitForThingsToLevelOut(30, TimeUnit.SECONDS);
cloudClient.waitForState(collectionName, 30, TimeUnit.SECONDS, SolrCloudTestCase.activeClusterShape(2, 12));
@ -411,7 +411,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
@Test
@Nightly
public void testSplitWithChaosMonkey() throws Exception {
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
log.info("Using legacyCloud=false for cluster");
CollectionAdminRequest.setClusterProperty(ZkStateReader.LEGACY_CLOUD, "false")
@ -564,7 +564,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
@Test
public void testSplitLocking() throws Exception {
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
String collectionName = "testSplitLocking";
CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(collectionName, "conf1", 1, 2);
create.setMaxShardsPerNode(5); // some high number so we can create replicas without hindrance
@ -634,7 +634,7 @@ public class ShardSplitTest extends BasicDistributedZkTest {
}
private void doSplitShardWithRule(SolrIndexSplitter.SplitMethod splitMethod) throws Exception {
waitForThingsToLevelOut(15);
waitForThingsToLevelOut(15, TimeUnit.SECONDS);
if (usually()) {
log.info("Using legacyCloud=false for cluster");

View File

@ -167,7 +167,7 @@ public class TestCollectionsAPIViaSolrCloudCluster extends SolrCloudTestCase {
CollectionAdminRequest.deleteCollection(collectionName).process(client);
AbstractDistribZkTestBase.waitForCollectionToDisappear
(collectionName, client.getZkStateReader(), true, true, 330);
(collectionName, client.getZkStateReader(), true, 330);
// create it again
createCollection(collectionName, null);
@ -206,7 +206,7 @@ public class TestCollectionsAPIViaSolrCloudCluster extends SolrCloudTestCase {
// delete the collection
CollectionAdminRequest.deleteCollection(collectionName).process(client);
AbstractDistribZkTestBase.waitForCollectionToDisappear
(collectionName, client.getZkStateReader(), true, true, 330);
(collectionName, client.getZkStateReader(), true, 330);
}
@Test

View File

@ -499,7 +499,7 @@ public class BaseCdcrDistributedZkTest extends AbstractDistribZkTestBase {
try {
client.connect();
ZkStateReader zkStateReader = client.getZkStateReader();
AbstractDistribZkTestBase.waitForCollectionToDisappear(collection, zkStateReader, false, true, 15);
AbstractDistribZkTestBase.waitForCollectionToDisappear(collection, zkStateReader, true, 15);
} finally {
client.close();
}

View File

@ -21,6 +21,7 @@ import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.http.client.HttpClient;
@ -60,7 +61,7 @@ public class TestAuthorizationFramework extends AbstractFullDistribZkTestBase {
MockAuthorizationPlugin.denyUsers.add("user1");
try {
waitForThingsToLevelOut(10);
waitForThingsToLevelOut(10, TimeUnit.SECONDS);
String baseUrl = jettys.get(0).getBaseUrl().toString();
verifySecurityStatus(cloudClient.getLbClient().getHttpClient(), baseUrl + "/admin/authorization", "authorization/class", MockAuthorizationPlugin.class.getName(), 20);
log.info("Starting test");

View File

@ -135,7 +135,7 @@ public class TestSolrCloudWithHadoopAuthPlugin extends SolrCloudAuthTestCase {
CollectionAdminRequest.Delete deleteReq = CollectionAdminRequest.deleteCollection(collectionName);
deleteReq.process(solrClient);
AbstractDistribZkTestBase.waitForCollectionToDisappear(collectionName,
solrClient.getZkStateReader(), true, true, 330);
solrClient.getZkStateReader(), true, 330);
assertAuthMetricsMinimums(14, 8, 0, 6, 0, 0);
}
}

View File

@ -55,7 +55,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
private static final String ZOOKEEPER_FORCE_SYNC = "zookeeper.forceSync";
protected static final String DEFAULT_COLLECTION = "collection1";
protected volatile ZkTestServer zkServer;
private AtomicInteger homeCount = new AtomicInteger();
private final AtomicInteger homeCount = new AtomicInteger();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -141,20 +141,26 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
protected void waitForRecoveriesToFinish(String collection, ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout)
throws Exception {
waitForRecoveriesToFinish(collection, zkStateReader, verbose, failOnTimeout, 330);
waitForRecoveriesToFinish(collection, zkStateReader, verbose, failOnTimeout, 330, SECONDS);
}
public static void waitForRecoveriesToFinish(String collection,
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, long timeoutSeconds)
throws Exception {
log.info("Wait for recoveries to finish - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
waitForRecoveriesToFinish(collection, zkStateReader, verbose, failOnTimeout, timeoutSeconds, SECONDS);
}
public static void waitForRecoveriesToFinish(String collection,
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, long timeout, TimeUnit unit)
throws Exception {
log.info("Wait for recoveries to finish - collection:{} failOnTimeout:{} timeout:{}{}",
collection, failOnTimeout, timeout, unit);
try {
zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (liveNodes, docCollection) -> {
zkStateReader.waitForState(collection, timeout, unit, (liveNodes, docCollection) -> {
if (docCollection == null)
return false;
boolean sawLiveRecovering = false;
assertNotNull("Could not find collection:" + collection, docCollection);
Map<String,Slice> slices = docCollection.getSlicesMap();
assertNotNull("Could not find collection:" + collection, slices);
for (Map.Entry<String,Slice> entry : slices.entrySet()) {
@ -179,12 +185,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
}
}
if (!sawLiveRecovering) {
if (!sawLiveRecovering) {
if (verbose) System.out.println("no one is recoverying");
} else {
if (verbose) System.out.println("Gave up waiting for recovery to finish..");
return false;
}
if (verbose) System.out.println("no one is recoverying");
return true;
} else {
return false;
@ -193,24 +194,20 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
} catch (TimeoutException | InterruptedException e) {
Diagnostics.logThreadDumps("Gave up waiting for recovery to finish. THREAD DUMP:");
zkStateReader.getZkClient().printLayoutToStream(System.out);
fail("There are still nodes recoverying - waited for " + timeoutSeconds + " seconds");
fail("There are still nodes recovering - waited for " + timeout + unit);
}
log.info("Recoveries finished - collection: " + collection);
log.info("Recoveries finished - collection:{}", collection);
}
public static void waitForCollectionToDisappear(String collection,
ZkStateReader zkStateReader, boolean verbose, boolean failOnTimeout, int timeoutSeconds)
ZkStateReader zkStateReader, boolean failOnTimeout, int timeoutSeconds)
throws Exception {
log.info("Wait for collection to disappear - collection: " + collection + " failOnTimeout:" + failOnTimeout + " timeout (sec):" + timeoutSeconds);
zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection) -> {
if (docCollection == null)
return true;
return false;
});
log.info("Collection has disappeared - collection: " + collection);
zkStateReader.waitForState(collection, timeoutSeconds, TimeUnit.SECONDS, (docCollection) -> docCollection == null);
log.info("Collection has disappeared - collection:{}", collection);
}
static void waitForNewLeader(CloudSolrClient cloudClient, String shardName, Replica oldLeader, TimeOut timeOut)
@ -321,7 +318,7 @@ public abstract class AbstractDistribZkTestBase extends BaseDistributedSearchTes
protected void restartZk(int pauseMillis) throws Exception {
log.info("Restarting ZK with a pause of {}ms in between", pauseMillis);
zkServer.shutdown();
// disconnect enough to test stalling, if things stall, then clientSoTimeout w""ill be hit
// disconnect enough to test stalling, if things stall, then clientSoTimeout will be hit
Thread.sleep(pauseMillis);
zkServer = new ZkTestServer(zkServer.getZkDir(), zkServer.getPort());
zkServer.run(false);

View File

@ -1055,7 +1055,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
super.waitForRecoveriesToFinish(collection, zkStateReader, verbose);
}
protected void waitForRecoveriesToFinish(boolean verbose, int timeoutSeconds)
protected void waitForRecoveriesToFinish(boolean verbose, long timeoutSeconds)
throws Exception {
ZkStateReader zkStateReader = cloudClient.getZkStateReader();
super.waitForRecoveriesToFinish(DEFAULT_COLLECTION, zkStateReader, verbose, true, timeoutSeconds);
@ -1329,11 +1329,7 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
try {
SolrParams query = params("q","*:*", "rows","0", "distrib","false", "tests","checkShardConsistency"); // "tests" is just a tag that won't do anything except be echoed in logs
num = cjetty.client.solrClient.query(query).getResults().getNumFound();
} catch (SolrServerException e) {
if (verbose) System.err.println("error contacting client: "
+ e.getMessage() + "\n");
continue;
} catch (SolrException e) {
} catch (SolrException | SolrServerException e) {
if (verbose) System.err.println("error contacting client: "
+ e.getMessage() + "\n");
continue;
@ -1610,36 +1606,40 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
public abstract void safeStop();
}
public void waitForThingsToLevelOut(int waitForRecTimeSeconds) throws Exception {
log.info("Wait for recoveries to finish - wait " + waitForRecTimeSeconds + " for each attempt");
public void waitForThingsToLevelOut() throws Exception {
// Arbitrary, but if we're waiting for longer than 10 minutes, then fail the test anyway
waitForThingsToLevelOut(10, TimeUnit.MINUTES);
}
public void waitForThingsToLevelOut(int timeout, TimeUnit unit) throws Exception {
log.info("Wait for recoveries to finish - wait {}{} for each attempt", timeout, unit);
int cnt = 0;
boolean retry = false;
boolean retry;
do {
waitForRecoveriesToFinish(VERBOSE, waitForRecTimeSeconds);
waitForRecoveriesToFinish(VERBOSE, unit.toSeconds(timeout));
try {
commit();
} catch (Throwable t) {
t.printStackTrace();
} catch (Exception e) {
// we don't care if this commit fails on some nodes
log.info("Commit failed while waiting for recoveries", e);
}
updateMappingsFromZk(jettys, clients);
Set<String> theShards = shardToJetty.keySet();
String failMessage = null;
retry = false;
for (String shard : theShards) {
failMessage = checkShardConsistency(shard, true, false);
String failMessage = checkShardConsistency(shard, true, false);
if (failMessage != null) {
log.info("shard inconsistency - will retry ...");
retry = true;
}
}
if (failMessage != null) {
log.info("shard inconsistency - waiting ...");
retry = true;
} else {
retry = false;
if (cnt++ > 30) {
throw new TimeoutException("Cluster state still in flux after 30 retry intervals.");
}
cnt++;
if (cnt > 30) break;
Thread.sleep(2000);
} while (retry);
}
@ -2036,12 +2036,6 @@ public abstract class AbstractFullDistribZkTestBase extends AbstractDistribZkTes
fail("Could not find the new collection - " + exp.code() + " : " + collectionClient.getBaseURL());
}
protected void assertCollectionNotExists(String collectionName, int timeoutSeconds) throws Exception {
waitForCollectionToDisappear(collectionName, getCommonCloudSolrClient().getZkStateReader(), false, true, timeoutSeconds);
assertFalse(cloudClient.getZkStateReader().getZkClient().exists(ZkStateReader.COLLECTIONS_ZKNODE + "/" + collectionName, true));
}
protected void createCollection(String collName,
CloudSolrClient client,
int replicationFactor ,