diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java index 2581fe429ec..8f5b94f31d5 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java @@ -39,7 +39,6 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.util.LuceneTestCase.AwaitsFix; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; @@ -76,20 +75,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Slow -@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-12313") public class TestTlogReplica extends SolrCloudTestCase { - + private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - + private String collectionName = null; private final static int REPLICATION_TIMEOUT_SECS = 10; - + private String suggestedCollectionName() { return (getTestClass().getSimpleName().replace("Test", "") + "_" + getSaferTestName().split(" ")[0]).replaceAll("(.)(\\p{Upper})", "$1_$2").toLowerCase(Locale.ROOT); } @BeforeClass public static void setupCluster() throws Exception { + System.setProperty("solr.waitToSeeReplicasInStateTimeoutSeconds", "30"); configureCluster(2) // 2 + random().nextInt(3) .addConfig("conf", configset("cloud-minimal-inplace-updates")) .configure(); @@ -99,12 +98,12 @@ public class TestTlogReplica extends SolrCloudTestCase { CollectionAdminResponse response = clusterPropRequest.process(cluster.getSolrClient()); assertEquals(0, response.getStatus()); } - + @AfterClass public static void tearDownCluster() { TestInjection.reset(); } - + @Override public void setUp() throws Exception { super.setUp(); @@ -127,7 +126,7 @@ public class TestTlogReplica extends SolrCloudTestCase { } super.tearDown(); } - + /** * Asserts that Update logs exist for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#NRT}, but not * for replicas of type {@link org.apache.solr.common.cloud.Replica.Type#PULL} @@ -139,7 +138,7 @@ public class TestTlogReplica extends SolrCloudTestCase { try { core = cluster.getReplicaJetty(r).getCoreContainer().getCore(r.getCoreName()); assertNotNull(core); - assertTrue("Update log should exist for replicas of type Append", + assertTrue("Update log should exist for replicas of type Append", new java.io.File(core.getUlogDir()).exists()); } finally { core.close(); @@ -147,99 +146,94 @@ public class TestTlogReplica extends SolrCloudTestCase { } } } - + @Repeat(iterations=2) // 2 times to make sure cleanup is complete and we can create the same collection - // commented out on: 17-Feb-2019 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // added 09-Aug-2018 public void testCreateDelete() throws Exception { - try { - switch (random().nextInt(3)) { - case 0: - CollectionAdminRequest.createCollection(collectionName, "conf", 2, 0, 4, 0) - .setMaxShardsPerNode(100) - .process(cluster.getSolrClient()); - cluster.waitForActiveCollection(collectionName, 2, 8); - break; - case 1: - // Sometimes don't use SolrJ - String url = String.format(Locale.ROOT, "%s/admin/collections?action=CREATE&name=%s&collection.configName=%s&numShards=%s&tlogReplicas=%s&maxShardsPerNode=%s", - cluster.getRandomJetty(random()).getBaseUrl(), - collectionName, "conf", - 2, // numShards - 4, // tlogReplicas - 100); // maxShardsPerNode - HttpGet createCollectionGet = new HttpGet(url); - HttpResponse httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionGet); - assertEquals(200, httpResponse.getStatusLine().getStatusCode()); - cluster.waitForActiveCollection(collectionName, 2, 8); - break; - case 2: - // Sometimes use V2 API - url = cluster.getRandomJetty(random()).getBaseUrl().toString() + "/____v2/c"; - String requestBody = String.format(Locale.ROOT, "{create:{name:%s, config:%s, numShards:%s, tlogReplicas:%s, maxShardsPerNode:%s}}", - collectionName, "conf", - 2, // numShards - 4, // tlogReplicas - 100); // maxShardsPerNode - HttpPost createCollectionPost = new HttpPost(url); - createCollectionPost.setHeader("Content-type", "application/json"); - createCollectionPost.setEntity(new StringEntity(requestBody)); - httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionPost); - assertEquals(200, httpResponse.getStatusLine().getStatusCode()); - cluster.waitForActiveCollection(collectionName, 2, 8); - break; + switch (random().nextInt(3)) { + case 0: + CollectionAdminRequest.createCollection(collectionName, "conf", 2, 0, 4, 0) + .setMaxShardsPerNode(100) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(collectionName, 2, 8); + break; + case 1: + // Sometimes don't use SolrJ + String url = String.format(Locale.ROOT, "%s/admin/collections?action=CREATE&name=%s&collection.configName=%s&numShards=%s&tlogReplicas=%s&maxShardsPerNode=%s", + cluster.getRandomJetty(random()).getBaseUrl(), + collectionName, "conf", + 2, // numShards + 4, // tlogReplicas + 100); // maxShardsPerNode + HttpGet createCollectionGet = new HttpGet(url); + HttpResponse httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionGet); + assertEquals(200, httpResponse.getStatusLine().getStatusCode()); + cluster.waitForActiveCollection(collectionName, 2, 8); + break; + case 2: + // Sometimes use V2 API + url = cluster.getRandomJetty(random()).getBaseUrl().toString() + "/____v2/c"; + String requestBody = String.format(Locale.ROOT, "{create:{name:%s, config:%s, numShards:%s, tlogReplicas:%s, maxShardsPerNode:%s}}", + collectionName, "conf", + 2, // numShards + 4, // tlogReplicas + 100); // maxShardsPerNode + HttpPost createCollectionPost = new HttpPost(url); + createCollectionPost.setHeader("Content-type", "application/json"); + createCollectionPost.setEntity(new StringEntity(requestBody)); + httpResponse = cluster.getSolrClient().getHttpClient().execute(createCollectionPost); + assertEquals(200, httpResponse.getStatusLine().getStatusCode()); + cluster.waitForActiveCollection(collectionName, 2, 8); + break; + } + + boolean reloaded = false; + while (true) { + DocCollection docCollection = getCollectionState(collectionName); + assertNotNull(docCollection); + assertEquals("Expecting 2 shards", + 2, docCollection.getSlices().size()); + assertEquals("Expecting 4 relpicas per shard", + 8, docCollection.getReplicas().size()); + assertEquals("Expecting 8 tlog replicas, 4 per shard", + 8, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size()); + assertEquals("Expecting no nrt replicas", + 0, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); + assertEquals("Expecting no pull replicas", + 0, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); + for (Slice s:docCollection.getSlices()) { + assertTrue(s.getLeader().getType() == Replica.Type.TLOG); + List shardElectionNodes = cluster.getZkClient().getChildren(ZkStateReader.getShardLeadersElectPath(collectionName, s.getName()), null, true); + assertEquals("Unexpected election nodes for Shard: " + s.getName() + ": " + Arrays.toString(shardElectionNodes.toArray()), + 4, shardElectionNodes.size()); } - - boolean reloaded = false; - while (true) { - DocCollection docCollection = getCollectionState(collectionName); - assertNotNull(docCollection); - assertEquals("Expecting 2 shards", - 2, docCollection.getSlices().size()); - assertEquals("Expecting 4 relpicas per shard", - 8, docCollection.getReplicas().size()); - assertEquals("Expecting 8 tlog replicas, 4 per shard", - 8, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).size()); - assertEquals("Expecting no nrt replicas", - 0, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).size()); - assertEquals("Expecting no pull replicas", - 0, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).size()); - for (Slice s:docCollection.getSlices()) { - assertTrue(s.getLeader().getType() == Replica.Type.TLOG); - List shardElectionNodes = cluster.getZkClient().getChildren(ZkStateReader.getShardLeadersElectPath(collectionName, s.getName()), null, true); - assertEquals("Unexpected election nodes for Shard: " + s.getName() + ": " + Arrays.toString(shardElectionNodes.toArray()), - 4, shardElectionNodes.size()); - } - assertUlogPresence(docCollection); - if (reloaded) { - break; - } else { - // reload - CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName) - .process(cluster.getSolrClient()); - assertEquals(0, response.getStatus()); - waitForState("failed waiting for active colletion", collectionName, clusterShape(2, 8)); - reloaded = true; - } + assertUlogPresence(docCollection); + if (reloaded) { + break; + } else { + // reload + CollectionAdminResponse response = CollectionAdminRequest.reloadCollection(collectionName) + .process(cluster.getSolrClient()); + assertEquals(0, response.getStatus()); + waitForState("failed waiting for active colletion", collectionName, clusterShape(2, 8)); + reloaded = true; } - } finally { - zkClient().printLayoutToStdOut(); } } - + @SuppressWarnings("unchecked") public void testAddDocs() throws Exception { int numTlogReplicas = 1 + random().nextInt(3); DocCollection docCollection = createAndWaitForCollection(1, 0, numTlogReplicas, 0); assertEquals(1, docCollection.getSlices().size()); - + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar")); cluster.getSolrClient().commit(collectionName); - + Slice s = docCollection.getSlices().iterator().next(); try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) { assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound()); } - + TimeOut t = new TimeOut(REPLICATION_TIMEOUT_SECS, TimeUnit.SECONDS, TimeSource.NANO_TIME); for (Replica r:s.getReplicas(EnumSet.of(Replica.Type.TLOG))) { //TODO: assert replication < REPLICATION_TIMEOUT_SECS @@ -253,7 +247,7 @@ public class TestTlogReplica extends SolrCloudTestCase { "qt", "/admin/plugins", "stats", "true"); QueryResponse statsResponse = tlogReplicaClient.query(req); - assertEquals("Append replicas should recive all updates. Replica: " + r + ", response: " + statsResponse, + assertEquals("Append replicas should recive all updates. Replica: " + r + ", response: " + statsResponse, 1L, ((Map)((NamedList)statsResponse.getResponse()).findRecursive("plugins", "UPDATE", "updateHandler", "stats")).get("UPDATE.updateHandler.cumulativeAdds.count")); break; } catch (AssertionError e) { @@ -268,27 +262,27 @@ public class TestTlogReplica extends SolrCloudTestCase { } assertUlogPresence(docCollection); } - + public void testAddRemoveTlogReplica() throws Exception { DocCollection docCollection = createAndWaitForCollection(2, 0, 1, 0); assertEquals(2, docCollection.getSlices().size()); - + addReplicaToShard("shard1", Replica.Type.TLOG); docCollection = assertNumberOfReplicas(0, 3, 0, true, false); addReplicaToShard("shard2", Replica.Type.TLOG); docCollection = assertNumberOfReplicas(0, 4, 0, true, false); - + waitForState("Expecting collection to have 2 shards and 2 replica each", collectionName, clusterShape(2, 4)); - + //Delete tlog replica from shard1 CollectionAdminRequest.deleteReplica( - collectionName, - "shard1", + collectionName, + "shard1", docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.TLOG)).get(0).getName()) .process(cluster.getSolrClient()); assertNumberOfReplicas(0, 3, 0, true, true); } - + private void addReplicaToShard(String shardName, Replica.Type type) throws ClientProtocolException, IOException, SolrServerException { switch (random().nextInt(3)) { case 0: // Add replica with SolrJ @@ -296,8 +290,8 @@ public class TestTlogReplica extends SolrCloudTestCase { assertEquals("Unexpected response status: " + response.getStatus(), 0, response.getStatus()); break; case 1: // Add replica with V1 API - String url = String.format(Locale.ROOT, "%s/admin/collections?action=ADDREPLICA&collection=%s&shard=%s&type=%s", - cluster.getRandomJetty(random()).getBaseUrl(), + String url = String.format(Locale.ROOT, "%s/admin/collections?action=ADDREPLICA&collection=%s&shard=%s&type=%s", + cluster.getRandomJetty(random()).getBaseUrl(), collectionName, shardName, type); @@ -306,10 +300,10 @@ public class TestTlogReplica extends SolrCloudTestCase { assertEquals(200, httpResponse.getStatusLine().getStatusCode()); break; case 2:// Add replica with V2 API - url = String.format(Locale.ROOT, "%s/____v2/c/%s/shards", - cluster.getRandomJetty(random()).getBaseUrl(), + url = String.format(Locale.ROOT, "%s/____v2/c/%s/shards", + cluster.getRandomJetty(random()).getBaseUrl(), collectionName); - String requestBody = String.format(Locale.ROOT, "{add-replica:{shard:%s, type:%s}}", + String requestBody = String.format(Locale.ROOT, "{add-replica:{shard:%s, type:%s}}", shardName, type); HttpPost addReplicaPost = new HttpPost(url); @@ -320,15 +314,15 @@ public class TestTlogReplica extends SolrCloudTestCase { break; } } - + public void testRemoveLeader() throws Exception { doReplaceLeader(true); } - + public void testKillLeader() throws Exception { doReplaceLeader(false); } - + public void testRealTimeGet() throws SolrServerException, IOException, KeeperException, InterruptedException { // should be redirected to Replica.Type.REALTIME int numReplicas = random().nextBoolean()?1:2; @@ -373,13 +367,13 @@ public class TestTlogReplica extends SolrCloudTestCase { id++; } } - + /* * validate leader election and that replication still happens on a new leader */ private void doReplaceLeader(boolean removeReplica) throws Exception { DocCollection docCollection = createAndWaitForCollection(1, 0, 2, 0); - + // Add a document and commit cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar")); cluster.getSolrClient().commit(collectionName); @@ -387,15 +381,15 @@ public class TestTlogReplica extends SolrCloudTestCase { try (HttpSolrClient leaderClient = getHttpSolrClient(s.getLeader().getCoreUrl())) { assertEquals(1, leaderClient.query(new SolrQuery("*:*")).getResults().getNumFound()); } - + waitForNumDocsInAllReplicas(1, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)), REPLICATION_TIMEOUT_SECS); - + // Delete leader replica from shard1 JettySolrRunner leaderJetty = null; if (removeReplica) { CollectionAdminRequest.deleteReplica( - collectionName, - "shard1", + collectionName, + "shard1", s.getLeader().getName()) .process(cluster.getSolrClient()); } else { @@ -403,32 +397,24 @@ public class TestTlogReplica extends SolrCloudTestCase { leaderJetty.stop(); waitForState("Leader replica not removed", collectionName, clusterShape(1, 1)); // Wait for cluster state to be updated - waitForState("Replica state not updated in cluster state", + waitForState("Replica state not updated in cluster state", collectionName, clusterStateReflectsActiveAndDownReplicas()); } docCollection = assertNumberOfReplicas(0, 1, 0, true, true); - + // Wait until a new leader is elected - TimeOut t = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME); - while (!t.hasTimedOut()) { - docCollection = getCollectionState(collectionName); - Replica leader = docCollection.getSlice("shard1").getLeader(); - if (leader != null && leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) { - break; - } - Thread.sleep(500); - } - assertFalse("Timeout waiting for a new leader to be elected", t.hasTimedOut()); + waitForLeaderChange(leaderJetty, "shard1"); // There is a new leader, I should be able to add and commit cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "zoo")); cluster.getSolrClient().commit(collectionName); - + // Queries should still work waitForNumDocsInAllReplicas(2, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)), REPLICATION_TIMEOUT_SECS); // Start back the node if (removeReplica) { - CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.TLOG).process(cluster.getSolrClient()); + addReplicaWithRetries(); + } else { leaderJetty.start(); } @@ -436,35 +422,51 @@ public class TestTlogReplica extends SolrCloudTestCase { // added replica should replicate from the leader waitForNumDocsInAllReplicas(2, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)), REPLICATION_TIMEOUT_SECS); } - + + private void addReplicaWithRetries() throws SolrServerException, IOException { + int maxAttempts = 3; + for (int i = 0; i < maxAttempts ; i++) { + try { + CollectionAdminResponse respone = CollectionAdminRequest.addReplicaToShard(collectionName, "shard1", Replica.Type.TLOG).process(cluster.getSolrClient()); + // This is an unfortunate hack. There are cases where the ADDREPLICA fails, will create a Jira to address that separately. for now, we'll retry + if (respone.isSuccess()) { + break; + } + log.error("Unsuccessful atempt to add replica. Attempt: %d/%d", i, maxAttempts); + } catch (SolrException e) { + log.error("Exception while adding replica. Attempt: " + i + "/" + maxAttempts, e); + } + } + } + public void testKillTlogReplica() throws Exception { DocCollection docCollection = createAndWaitForCollection(1, 0, 2, 0); - + waitForNumDocsInAllActiveReplicas(0); cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "1", "foo", "bar")); cluster.getSolrClient().commit(collectionName); waitForNumDocsInAllActiveReplicas(1); - + JettySolrRunner pullReplicaJetty = cluster.getReplicaJetty(docCollection.getSlice("shard1").getReplicas(EnumSet.of(Replica.Type.TLOG)).get(0)); pullReplicaJetty.stop(); waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0)); + waitForLeaderChange(pullReplicaJetty, "shard1"); // // Also wait for the replica to be placed in state="down" // waitForState("Didn't update state", collectionName, clusterStateReflectsActiveAndDownReplicas()); - + cluster.getSolrClient().add(collectionName, new SolrInputDocument("id", "2", "foo", "bar")); cluster.getSolrClient().commit(collectionName); waitForNumDocsInAllActiveReplicas(2); - + pullReplicaJetty.start(); waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0)); waitForNumDocsInAllActiveReplicas(2); } @Test - // Removed BadApple on 2018-05-21 public void testOnlyLeaderIndexes() throws Exception { createAndWaitForCollection(1, 0, 2, 0); - + CloudSolrClient cloudClient = cluster.getSolrClient(); new UpdateRequest() .add(sdoc("id", "1")) @@ -525,13 +527,13 @@ public class TestTlogReplica extends SolrCloudTestCase { } } checkRTG(120,150, cluster.getJettySolrRunners()); - waitForReplicasCatchUp(20); + waitForReplicasCatchUp(4 * REPLICATION_TIMEOUT_SECS); } - + @SuppressWarnings("unchecked") public void testRecovery() throws Exception { createAndWaitForCollection(1, 0, 2, 0); - + CloudSolrClient cloudClient = cluster.getSolrClient(); new UpdateRequest() .add(sdoc("id", "3")) @@ -551,13 +553,12 @@ public class TestTlogReplica extends SolrCloudTestCase { // We skip peerSync, so replica will always trigger commit on leader // We query only the non-leader replicas, since we haven't opened a new searcher on the leader yet waitForNumDocsInAllReplicas(4, getNonLeaderReplias(collectionName), 10); //timeout for stale collection state - + // If I add the doc immediately, the leader fails to communicate with the follower with broken pipe. // Options are, wait or retry... for (int i = 0; i < 3; i++) { UpdateRequest ureq = new UpdateRequest().add(sdoc("id", "7")); ureq.setParam("collection", collectionName); - ureq.setParam(UpdateRequest.MIN_REPFACT, "2"); NamedList response = cloudClient.request(ureq); if ((Integer)((NamedList)response.get("responseHeader")).get(UpdateRequest.REPFACT) >= 2) { break; @@ -595,7 +596,6 @@ public class TestTlogReplica extends SolrCloudTestCase { for (int i = 0; i < 3; i++) { UpdateRequest ureq = new UpdateRequest().add(sdoc("id", "8")); ureq.setParam("collection", collectionName); - ureq.setParam(UpdateRequest.MIN_REPFACT, "2"); NamedList response = cloudClient.request(ureq); if ((Integer)((NamedList)response.get("responseHeader")).get(UpdateRequest.REPFACT) >= 2) { break; @@ -616,12 +616,12 @@ public class TestTlogReplica extends SolrCloudTestCase { iwRef.decref(); } } - + private List getNonLeaderReplias(String collectionName) { return getCollectionState(collectionName).getReplicas().stream().filter( (r)-> !r.getBool("leader", false)).collect(Collectors.toList()); } - + public void testDeleteById() throws Exception{ createAndWaitForCollection(1,0,2,0); CloudSolrClient cloudClient = cluster.getSolrClient(); @@ -644,7 +644,7 @@ public class TestTlogReplica extends SolrCloudTestCase { } assertFalse("Doc1 is deleted but it's still exist", successs); } - + public void testBasicLeaderElection() throws Exception { createAndWaitForCollection(1,0,2,0); CloudSolrClient cloudClient = cluster.getSolrClient(); @@ -658,7 +658,11 @@ public class TestTlogReplica extends SolrCloudTestCase { JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0); oldLeaderJetty.stop(); waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0)); - new UpdateRequest() + + // Even after the replica is gone, a leader may not be elected yet. Wait for it. + waitForLeaderChange(oldLeaderJetty, "shard1"); + + new UpdateRequest() .add(sdoc("id", "3")) .add(sdoc("id", "4")) .process(cloudClient, collectionName); @@ -669,7 +673,19 @@ public class TestTlogReplica extends SolrCloudTestCase { .commit(cloudClient, collectionName); waitForNumDocsInAllActiveReplicas(4, 0); } - + + private void waitForLeaderChange(JettySolrRunner oldLeaderJetty, String shardName) { + waitForState("Expect new leader", collectionName, + (liveNodes, collectionState) -> { + Replica leader = collectionState.getLeader(shardName); + if (leader == null || !leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) { + return false; + } + return oldLeaderJetty == null || !leader.getNodeName().equals(oldLeaderJetty.getNodeName()); + } + ); + } + public void testOutOfOrderDBQWithInPlaceUpdates() throws Exception { createAndWaitForCollection(1,0,2,0); assertFalse(getSolrCore(true).get(0).getLatestSchema().getField("inplace_updatable_int").indexed()); @@ -687,23 +703,16 @@ public class TestTlogReplica extends SolrCloudTestCase { } } JettySolrRunner oldLeaderJetty = getSolrRunner(true).get(0); - String oldLeaderNodeName = oldLeaderJetty.getNodeName(); oldLeaderJetty.stop(); waitForState("Replica not removed", collectionName, activeReplicaCount(0, 1, 0)); - waitForState("Expect new leader", collectionName, - (liveNodes, collectionState) -> { - Replica leader = collectionState.getLeader("shard1"); - if (leader == null) return false; - return !leader.getNodeName().equals(oldLeaderNodeName); - } - ); + waitForLeaderChange(oldLeaderJetty, "shard1"); oldLeaderJetty.start(); waitForState("Replica not added", collectionName, activeReplicaCount(0, 2, 0)); checkRTG(1,1, cluster.getJettySolrRunners()); SolrDocument doc = cluster.getSolrClient().getById(collectionName,"1"); assertNotNull(doc.get("title_s")); } - + private UpdateRequest simulatedUpdateRequest(Long prevVersion, Object... fields) throws SolrServerException, IOException { SolrInputDocument doc = sdoc(fields); @@ -747,20 +756,20 @@ public class TestTlogReplica extends SolrCloudTestCase { collectionName, clusterShape(numShards, numShards * numReplicasPerShard)); return assertNumberOfReplicas(numNrtReplicas*numShards, numTlogReplicas*numShards, numPullReplicas*numShards, false, true); } - + private void waitForNumDocsInAllActiveReplicas(int numDocs) throws IOException, SolrServerException, InterruptedException { waitForNumDocsInAllActiveReplicas(numDocs, REPLICATION_TIMEOUT_SECS); } - + private void waitForNumDocsInAllActiveReplicas(int numDocs, int timeout) throws IOException, SolrServerException, InterruptedException { DocCollection docCollection = getCollectionState(collectionName); waitForNumDocsInAllReplicas(numDocs, docCollection.getReplicas().stream().filter(r -> r.getState() == Replica.State.ACTIVE).collect(Collectors.toList()), timeout); } - + private void waitForNumDocsInAllReplicas(int numDocs, Collection replicas, int timeout) throws IOException, SolrServerException, InterruptedException { waitForNumDocsInAllReplicas(numDocs, replicas, "*:*", timeout); } - + private void waitForNumDocsInAllReplicas(int numDocs, Collection replicas, String query, int timeout) throws IOException, SolrServerException, InterruptedException { TimeOut t = new TimeOut(timeout, TimeUnit.SECONDS, TimeSource.NANO_TIME); for (Replica r:replicas) { @@ -784,7 +793,7 @@ public class TestTlogReplica extends SolrCloudTestCase { } } } - + private void waitForDeletion(String collection) throws InterruptedException, KeeperException { TimeOut t = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); while (cluster.getSolrClient().getZkStateReader().getClusterState().hasCollection(collection)) { @@ -797,25 +806,25 @@ public class TestTlogReplica extends SolrCloudTestCase { } catch(SolrException e) { return; } - + } } - + private DocCollection assertNumberOfReplicas(int numNrtReplicas, int numTlogReplicas, int numPullReplicas, boolean updateCollection, boolean activeOnly) throws KeeperException, InterruptedException { if (updateCollection) { cluster.getSolrClient().getZkStateReader().forceUpdateCollection(collectionName); } DocCollection docCollection = getCollectionState(collectionName); assertNotNull(docCollection); - assertEquals("Unexpected number of nrt replicas: " + docCollection, numNrtReplicas, + assertEquals("Unexpected number of nrt replicas: " + docCollection, numNrtReplicas, docCollection.getReplicas(EnumSet.of(Replica.Type.NRT)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count()); - assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas, + assertEquals("Unexpected number of pull replicas: " + docCollection, numPullReplicas, docCollection.getReplicas(EnumSet.of(Replica.Type.PULL)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count()); - assertEquals("Unexpected number of tlog replicas: " + docCollection, numTlogReplicas, + assertEquals("Unexpected number of tlog replicas: " + docCollection, numTlogReplicas, docCollection.getReplicas(EnumSet.of(Replica.Type.TLOG)).stream().filter(r->!activeOnly || r.getState() == Replica.State.ACTIVE).count()); return docCollection; } - + /* * passes only if all replicas are active or down, and the "liveNodes" reflect the same status */ @@ -835,8 +844,8 @@ public class TestTlogReplica extends SolrCloudTestCase { return true; }; } - - + + private CollectionStatePredicate activeReplicaCount(int numNrtReplicas, int numTlogReplicas, int numPullReplicas) { return (liveNodes, collectionState) -> { int nrtFound = 0, tlogFound = 0, pullFound = 0; @@ -863,7 +872,7 @@ public class TestTlogReplica extends SolrCloudTestCase { return numNrtReplicas == nrtFound && numTlogReplicas == tlogFound && numPullReplicas == pullFound; }; } - + private List getSolrCore(boolean isLeader) { List rs = new ArrayList<>(); @@ -885,7 +894,7 @@ public class TestTlogReplica extends SolrCloudTestCase { } return rs; } - + private void checkRTG(int from, int to, List solrRunners) throws Exception{ for (JettySolrRunner solrRunner: solrRunners) { try (SolrClient client = solrRunner.newClient()) { @@ -900,7 +909,7 @@ public class TestTlogReplica extends SolrCloudTestCase { } } } - + private List getSolrRunner(boolean isLeader) { List rs = new ArrayList<>(); CloudSolrClient cloudClient = cluster.getSolrClient(); @@ -920,7 +929,7 @@ public class TestTlogReplica extends SolrCloudTestCase { } return rs; } - + private void waitForReplicasCatchUp(int numTry) throws IOException, InterruptedException { String leaderTimeCommit = getSolrCore(true).get(0).getDeletionPolicy().getLatestCommit().getUserData().get(SolrIndexWriter.COMMIT_TIME_MSEC_KEY); if (leaderTimeCommit == null) return;