mirror of https://github.com/apache/lucene.git
SOLR-13141: CDCR bootstrap does not replicate index to the replicas of target cluster.
The leader node on the target cluster will now increment its term after bootstrap succeeds so that all replicas of this leader are forced to recover and fetch the latest index from the leader.
This commit is contained in:
parent
2eb493d170
commit
e59f41b671
|
@ -140,6 +140,9 @@ Bug Fixes
|
|||
has been added to NodeAddTrigger so that new replicas of the given type are added. The default value is `NRT`.
|
||||
(Irena Shaigorodsky via shalin)
|
||||
|
||||
* SOLR-13141: CDCR bootstrap does not replicate index to the replicas of target cluster.
|
||||
(Krzysztof Watral, Amrit Sarkar, Tim, Tdspringsteen, shalin)
|
||||
|
||||
Other Changes
|
||||
----------------------
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.lang.invoke.MethodHandles;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -37,14 +36,11 @@ import org.apache.solr.client.solrj.SolrServerException;
|
|||
import org.apache.solr.client.solrj.impl.CloudSolrClient;
|
||||
import org.apache.solr.client.solrj.impl.CloudSolrClient.Builder;
|
||||
import org.apache.solr.client.solrj.impl.HttpSolrClient;
|
||||
import org.apache.solr.client.solrj.request.CoreAdminRequest;
|
||||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.params.CommonParams;
|
||||
import org.apache.solr.common.params.CoreAdminParams;
|
||||
import org.apache.solr.common.params.ModifiableSolrParams;
|
||||
import org.apache.solr.common.params.SolrParams;
|
||||
import org.apache.solr.common.util.ExecutorUtil;
|
||||
|
@ -303,8 +299,6 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
|
|||
checkpoint, collectionName, shard);
|
||||
CdcrUpdateLog.CdcrLogReader reader1 = ulog.newLogReader();
|
||||
reader1.seek(checkpoint);
|
||||
// issue asynchronous request_recovery to the follower nodes of the shards of target collection
|
||||
sendRequestRecoveryToFollowers(state);
|
||||
success = true;
|
||||
break;
|
||||
} else if (status == BootstrapStatus.FAILED) {
|
||||
|
@ -418,29 +412,6 @@ class CdcrReplicatorManager implements CdcrStateManager.CdcrStateObserver {
|
|||
return client.request(request);
|
||||
}
|
||||
|
||||
private void sendRequestRecoveryToFollowers(CdcrReplicatorState state) throws SolrServerException, IOException {
|
||||
Collection<Slice> slices = state.getClient().getZkStateReader().getClusterState().getCollection(state.getTargetCollection()).getActiveSlices();
|
||||
for (Slice slice : slices) {
|
||||
Collection<Replica> replicas = slice.getReplicas();
|
||||
for (Replica replica : replicas) {
|
||||
if (slice.getLeader().getCoreName().equals(replica.getCoreName())) {
|
||||
continue; // no need to request recovery for leader
|
||||
}
|
||||
sendRequestRecoveryToFollower(state.getClient(), replica.getCoreName());
|
||||
log.info("RequestRecovery cmd is issued by core: " + replica.getCoreName() + " of shard: " + slice.getName() +
|
||||
"for target: " + state.getTargetCollection());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private NamedList sendRequestRecoveryToFollower(SolrClient client, String coreName) throws SolrServerException, IOException {
|
||||
CoreAdminRequest.RequestRecovery recoverRequestCmd = new CoreAdminRequest.RequestRecovery();
|
||||
recoverRequestCmd.setAction(CoreAdminParams.CoreAdminAction.REQUESTRECOVERY);
|
||||
recoverRequestCmd.setCoreName(coreName);
|
||||
return client.request(recoverRequestCmd);
|
||||
}
|
||||
|
||||
|
||||
private enum BootstrapStatus {
|
||||
SUBMITTED,
|
||||
RUNNING,
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Collection;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -33,6 +34,7 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.solr.client.solrj.SolrRequest;
|
||||
import org.apache.solr.client.solrj.SolrServerException;
|
||||
|
@ -41,9 +43,11 @@ import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
|
|||
import org.apache.solr.client.solrj.request.QueryRequest;
|
||||
import org.apache.solr.client.solrj.request.UpdateRequest;
|
||||
import org.apache.solr.cloud.ZkController;
|
||||
import org.apache.solr.cloud.ZkShardTerms;
|
||||
import org.apache.solr.common.SolrException;
|
||||
import org.apache.solr.common.cloud.ClusterState;
|
||||
import org.apache.solr.common.cloud.DocCollection;
|
||||
import org.apache.solr.common.cloud.Replica;
|
||||
import org.apache.solr.common.cloud.Slice;
|
||||
import org.apache.solr.common.cloud.ZkCoreNodeProps;
|
||||
import org.apache.solr.common.cloud.ZkNodeProps;
|
||||
|
@ -786,6 +790,17 @@ public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAw
|
|||
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
|
||||
}
|
||||
}
|
||||
if (success) {
|
||||
ZkController zkController = core.getCoreContainer().getZkController();
|
||||
String collectionName = core.getCoreDescriptor().getCollectionName();
|
||||
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
|
||||
DocCollection collection = clusterState.getCollection(collectionName);
|
||||
Slice slice = collection.getSlice(core.getCoreDescriptor().getCloudDescriptor().getShardId());
|
||||
ZkShardTerms terms = zkController.getShardTerms(collectionName, slice.getName());
|
||||
String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
|
||||
Set<String> allExceptLeader = slice.getReplicas().stream().filter(replica -> !replica.getName().equals(coreNodeName)).map(Replica::getName).collect(Collectors.toSet());
|
||||
terms.ensureTermsIsHigher(coreNodeName, allExceptLeader);
|
||||
}
|
||||
return success;
|
||||
} finally {
|
||||
if (closed || !success) {
|
||||
|
|
|
@ -241,6 +241,58 @@ public class CdcrBootstrapTest extends SolrTestCaseJ4 {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test successfully validates the follower nodes at target copies content
|
||||
* from their respective leaders
|
||||
*/
|
||||
public void testBootstrapWithMultipleReplicas() throws Exception {
|
||||
// start the target first so that we know its zkhost
|
||||
MiniSolrCloudCluster target = new MiniSolrCloudCluster(3, createTempDir("cdcr-target"), buildJettyConfig("/solr"));
|
||||
try {
|
||||
System.out.println("Target zkHost = " + target.getZkServer().getZkAddress());
|
||||
System.setProperty("cdcr.target.zkHost", target.getZkServer().getZkAddress());
|
||||
|
||||
MiniSolrCloudCluster source = new MiniSolrCloudCluster(3, createTempDir("cdcr-source"), buildJettyConfig("/solr"));
|
||||
try {
|
||||
source.uploadConfigSet(configset("cdcr-source"), "cdcr-source");
|
||||
|
||||
CollectionAdminRequest.createCollection("cdcr-source", "cdcr-source", 1, 3)
|
||||
.withProperty("solr.directoryFactory", "solr.StandardDirectoryFactory")
|
||||
.process(source.getSolrClient());
|
||||
source.waitForActiveCollection("cdcr-source", 1, 3);
|
||||
|
||||
CloudSolrClient sourceSolrClient = source.getSolrClient();
|
||||
int docs = (TEST_NIGHTLY ? 100 : 10);
|
||||
int numDocs = indexDocs(sourceSolrClient, "cdcr-source", docs);
|
||||
|
||||
QueryResponse response = sourceSolrClient.query(new SolrQuery("*:*"));
|
||||
assertEquals("", numDocs, response.getResults().getNumFound());
|
||||
|
||||
// setup the target cluster
|
||||
target.uploadConfigSet(configset("cdcr-target"), "cdcr-target");
|
||||
CollectionAdminRequest.createCollection("cdcr-target", "cdcr-target", 1, 3)
|
||||
.process(target.getSolrClient());
|
||||
target.waitForActiveCollection("cdcr-target", 1, 3);
|
||||
CloudSolrClient targetSolrClient = target.getSolrClient();
|
||||
targetSolrClient.setDefaultCollection("cdcr-target");
|
||||
|
||||
CdcrTestsUtil.cdcrStart(targetSolrClient);
|
||||
CdcrTestsUtil.cdcrStart(sourceSolrClient);
|
||||
|
||||
response = CdcrTestsUtil.getCdcrQueue(sourceSolrClient);
|
||||
log.info("Cdcr queue response: " + response.getResponse());
|
||||
long foundDocs = CdcrTestsUtil.waitForClusterToSync(numDocs, targetSolrClient);
|
||||
assertEquals("Document mismatch on target after sync", numDocs, foundDocs);
|
||||
assertTrue("leader followers didnt' match", CdcrTestsUtil.assertShardInSync("cdcr-target", "shard1", targetSolrClient)); // with more than 1 replica
|
||||
|
||||
} finally {
|
||||
source.shutdown();
|
||||
}
|
||||
} finally {
|
||||
target.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
// 29-June-2018 @BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028")
|
||||
@BadApple(bugUrl="https://issues.apache.org/jira/browse/SOLR-12028") // 6-Sep-2018
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue