mirror of https://github.com/apache/lucene.git
SOLR-11656: TLOG replication doesn't work properly after rebalancing leaders.
This commit is contained in:
parent
ec007c4bf2
commit
4b2e90b3aa
|
@ -143,7 +143,9 @@ Optimizations
|
||||||
|
|
||||||
Bug Fixes
|
Bug Fixes
|
||||||
---------------------
|
---------------------
|
||||||
(No changes)
|
|
||||||
|
* SOLR-11656: TLOG replication doesn't work properly after rebalancing leaders. (Yuki Yano via
|
||||||
|
Erick Erickson)
|
||||||
|
|
||||||
Other Changes
|
Other Changes
|
||||||
---------------------
|
---------------------
|
||||||
|
|
|
@ -2215,6 +2215,20 @@ public class ZkController implements Closeable {
|
||||||
electionContexts.put(contextKey, context);
|
electionContexts.put(contextKey, context);
|
||||||
|
|
||||||
elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP, false));
|
elect.retryElection(context, params.getBool(REJOIN_AT_HEAD_PROP, false));
|
||||||
|
|
||||||
|
try (SolrCore core = cc.getCore(coreName)) {
|
||||||
|
Replica.Type replicaType = core.getCoreDescriptor().getCloudDescriptor().getReplicaType();
|
||||||
|
if (replicaType == Type.TLOG) {
|
||||||
|
String leaderUrl = getLeader(core.getCoreDescriptor().getCloudDescriptor(), cloudConfig.getLeaderVoteWait());
|
||||||
|
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
|
||||||
|
if (!leaderUrl.equals(ourUrl)) {
|
||||||
|
// restart the replication thread to ensure the replication is running in each new replica
|
||||||
|
// especially if previous role is "leader" (i.e., no replication thread)
|
||||||
|
stopReplicationFromLeader(coreName);
|
||||||
|
startReplicationFromLeader(coreName, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
|
throw new SolrException(ErrorCode.SERVER_ERROR, "Unable to rejoin election", e);
|
||||||
} finally {
|
} finally {
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner;
|
||||||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||||
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
|
||||||
|
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||||
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
import org.apache.solr.client.solrj.response.CollectionAdminResponse;
|
||||||
import org.apache.solr.client.solrj.response.QueryResponse;
|
import org.apache.solr.client.solrj.response.QueryResponse;
|
||||||
|
@ -59,6 +60,8 @@ import org.apache.solr.common.cloud.DocCollection;
|
||||||
import org.apache.solr.common.cloud.Replica;
|
import org.apache.solr.common.cloud.Replica;
|
||||||
import org.apache.solr.common.cloud.Slice;
|
import org.apache.solr.common.cloud.Slice;
|
||||||
import org.apache.solr.common.cloud.ZkStateReader;
|
import org.apache.solr.common.cloud.ZkStateReader;
|
||||||
|
import org.apache.solr.common.params.CollectionParams;
|
||||||
|
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||||
import org.apache.solr.common.util.NamedList;
|
import org.apache.solr.common.util.NamedList;
|
||||||
import org.apache.solr.common.util.TimeSource;
|
import org.apache.solr.common.util.TimeSource;
|
||||||
import org.apache.solr.core.SolrCore;
|
import org.apache.solr.core.SolrCore;
|
||||||
|
@ -669,7 +672,81 @@ public class TestTlogReplica extends SolrCloudTestCase {
|
||||||
.commit(cloudClient, collectionName);
|
.commit(cloudClient, collectionName);
|
||||||
waitForNumDocsInAllActiveReplicas(4, 0);
|
waitForNumDocsInAllActiveReplicas(4, 0);
|
||||||
}
|
}
|
||||||
|
public void testRebalanceLeaders() throws Exception {
|
||||||
|
createAndWaitForCollection(1,0,2,0);
|
||||||
|
CloudSolrClient cloudClient = cluster.getSolrClient();
|
||||||
|
new UpdateRequest()
|
||||||
|
.deleteByQuery("*:*")
|
||||||
|
.commit(cluster.getSolrClient(), collectionName);
|
||||||
|
|
||||||
|
// Find a replica which isn't leader
|
||||||
|
DocCollection docCollection = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName);
|
||||||
|
Slice slice = docCollection.getSlices().iterator().next();
|
||||||
|
Replica newLeader = null;
|
||||||
|
for (Replica replica : slice.getReplicas()) {
|
||||||
|
if (slice.getLeader() == replica) continue;
|
||||||
|
newLeader = replica;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
assertNotNull("Failed to find a candidate of new leader", newLeader);
|
||||||
|
|
||||||
|
// Set preferredLeader flag to the replica
|
||||||
|
ModifiableSolrParams params = new ModifiableSolrParams();
|
||||||
|
params.set("action", CollectionParams.CollectionAction.ADDREPLICAPROP.toString());
|
||||||
|
params.set("collection", collectionName);
|
||||||
|
params.set("shard", slice.getName());
|
||||||
|
params.set("replica", newLeader.getName());
|
||||||
|
params.set("property", "preferredLeader");
|
||||||
|
params.set("property.value", "true");
|
||||||
|
QueryRequest request = new QueryRequest(params);
|
||||||
|
request.setPath("/admin/collections");
|
||||||
|
cloudClient.request(request);
|
||||||
|
|
||||||
|
// Wait until a preferredleader flag is set to the new leader candidate
|
||||||
|
TimeOut timeout = new TimeOut(10, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||||
|
while (!timeout.hasTimedOut()) {
|
||||||
|
Map<String, Slice> slices = cloudClient.getZkStateReader().getClusterState().getCollection(collectionName).getSlicesMap();
|
||||||
|
Replica me = slices.get(slice.getName()).getReplica(newLeader.getName());
|
||||||
|
if (me.getBool("property.preferredleader", false)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
assertFalse("Timeout waiting for setting preferredleader flag", timeout.hasTimedOut());
|
||||||
|
|
||||||
|
// Rebalance leaders
|
||||||
|
params = new ModifiableSolrParams();
|
||||||
|
params.set("action", CollectionParams.CollectionAction.REBALANCELEADERS.toString());
|
||||||
|
params.set("collection", collectionName);
|
||||||
|
params.set("maxAtOnce", "10");
|
||||||
|
request = new QueryRequest(params);
|
||||||
|
request.setPath("/admin/collections");
|
||||||
|
cloudClient.request(request);
|
||||||
|
|
||||||
|
// Wait until a new leader is elected
|
||||||
|
timeout = new TimeOut(30, TimeUnit.SECONDS, TimeSource.NANO_TIME);
|
||||||
|
while (!timeout.hasTimedOut()) {
|
||||||
|
docCollection = getCollectionState(collectionName);
|
||||||
|
Replica leader = docCollection.getSlice(slice.getName()).getLeader();
|
||||||
|
if (leader != null && leader.getName().equals(newLeader.getName()) &&
|
||||||
|
leader.isActive(cluster.getSolrClient().getZkStateReader().getClusterState().getLiveNodes())) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
assertFalse("Timeout waiting for a new leader to be elected", timeout.hasTimedOut());
|
||||||
|
|
||||||
|
new UpdateRequest()
|
||||||
|
.add(sdoc("id", "1"))
|
||||||
|
.add(sdoc("id", "2"))
|
||||||
|
.add(sdoc("id", "3"))
|
||||||
|
.add(sdoc("id", "4"))
|
||||||
|
.process(cloudClient, collectionName);
|
||||||
|
checkRTG(1,4, cluster.getJettySolrRunners());
|
||||||
|
new UpdateRequest()
|
||||||
|
.commit(cloudClient, collectionName);
|
||||||
|
waitForNumDocsInAllActiveReplicas(4);
|
||||||
|
}
|
||||||
private void waitForLeaderChange(JettySolrRunner oldLeaderJetty, String shardName) {
|
private void waitForLeaderChange(JettySolrRunner oldLeaderJetty, String shardName) {
|
||||||
waitForState("Expect new leader", collectionName,
|
waitForState("Expect new leader", collectionName,
|
||||||
(liveNodes, collectionState) -> {
|
(liveNodes, collectionState) -> {
|
||||||
|
|
Loading…
Reference in New Issue