diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 31a86a22d6d..18b815acea8 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -143,7 +143,9 @@ Optimizations Bug Fixes --------------------- -(No changes) + +* SOLR-11656: TLOG replication doesn't work properly after rebalancing leaders. (Yuki Yano via + Erick Erickson) Other Changes --------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index ca44ece55cf..dc4c210e7db 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -2215,6 +2215,20 @@ public class ZkController implements Closeable { electionContexts.put(contextKey, context); elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP, false)); + + try (SolrCore core = cc.getCore(coreName)) { + Replica.Type replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType(); + if (replicaType == Type.TLOG) { + String leaderUrl = getLeader(core.getCoreDescriptor().getCloudDescriptor(), cloudConfig.getLeaderVoteWait()); + String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName); + if (!leaderUrl.equals(ourUrl)) { + // restart the replication thread to ensure the replication is running in each new replica + // especially if previous role is "leader" (i.e., no replication thread) + stopReplicationFromLeader(coreName); + startReplicationFromLeader(coreName, false); + } + } + } } catch (Exception e) { throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e); } finally { diff --git a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java index d68f46b632f..bbe86a7f992 100644 --- a/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java +++ b/solr/core/src/test/org/apache/solr/cloud/TestTlogReplica.java @@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.request.QueryRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.client.solrj.response.QueryResponse; @@ -59,6 +60,8 @@ import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.CollectionParams; +import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.util.NamedList; import org.apache.solr.common.util.TimeSource; import org.apache.solr.core.SolrCore; @@ -669,7 +672,81 @@ public class TestTlogReplica extends SolrCloudTestCase { .commit(cloudClient, collectionName); waitForNumDocsInAllActiveReplicas(4, 0); } + public void testRebalanceLeaders() throws Exception { + createAndWaitForCollection(1,0,2,0); + CloudSolrClient cloudClient = cluster.getSolrClient(); + new UpdateRequest() + .deleteByQuery("*:*") + .commit(cluster.getSolrClient(), collectionName); + // Find a replica which isn't leader + DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName); + Slice slice = docCollection.getSlices().iterator().next(); + Replica newLeader = null; + for (Replica replica : slice.getReplicas()) { + if (slice.getLeader() == replica) continue; + newLeader = replica; + break; + } + assertNotNull("Failed to find a candidate of new leader", newLeader); + + // Set preferredLeader flag to the replica + ModifiableSolrParams params = new ModifiableSolrParams(); + params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString()); + params.set("collection", collectionName); + params.set("shard", slice.getName()); + params.set("replica", newLeader.getName()); + params.set("property", "preferredLeader"); + params.set("property.value", "true"); + QueryRequest request = new QueryRequest(params); + request.setPath("/admin/collections"); + cloudClient.request(request); + + // Wait until a preferredleader flag is set to the new leader candidate + TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeout.hasTimedOut()) { + Map slices = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getSlicesMap(); + Replica me = slices.get(slice.getName()).getReplica(newLeader.getName()); + if (me.getBool("property.preferredleader", false)) { + break; + } + Thread.sleep(100); + } + assertFalse("Timeout waiting for setting preferredleader flag", timeout.hasTimedOut()); + + // Rebalance leaders + params = new ModifiableSolrParams(); + params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString()); + params.set("collection", collectionName); + params.set("maxAtOnce", "10"); + request = new QueryRequest(params); + request.setPath("/admin/collections"); + cloudClient.request(request); + + // Wait until a new leader is elected + timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME); + while (!timeout.hasTimedOut()) { + docCollection = getCollectionState(collectionName); + Replica leader = docCollection.getSlice(slice.getName()).getLeader(); + if (leader != null && leader.getName().equals(newLeader.getName()) && + leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) { + break; + } + Thread.sleep(100); + } + assertFalse("Timeout waiting for a new leader to be elected", timeout.hasTimedOut()); + + new UpdateRequest() + .add(sdoc("id", "1")) + .add(sdoc("id", "2")) + .add(sdoc("id", "3")) + .add(sdoc("id", "4")) + .process(cloudClient, collectionName); + checkRTG(1,4, cluster.getJettySolrRunners()); + new UpdateRequest() + .commit(cloudClient, collectionName); + waitForNumDocsInAllActiveReplicas(4); + } private void waitForLeaderChange(JettySolrRunner oldLeaderJetty, String shardName) { waitForState("Expect new leader", collectionName, (liveNodes, collectionState) -> {