From effd22457691420982534f47ee71cd52ef64b8b9 Mon Sep 17 00:00:00 2001 From: Shalin Shekhar Mangar Date: Thu, 29 Sep 2016 17:08:22 +0530 Subject: [PATCH] SOLR-9504: A replica with an empty index becomes the leader even when other more qualified replicas are in line (cherry picked from commit ce24de5) --- solr/CHANGES.txt | 3 + .../apache/solr/cloud/ElectionContext.java | 22 ++- .../apache/solr/cloud/RecoveryStrategy.java | 2 +- .../org/apache/solr/cloud/SyncStrategy.java | 34 +++-- .../handler/admin/RequestSyncShardOp.java | 2 +- .../component/RealTimeGetComponent.java | 2 +- .../java/org/apache/solr/update/PeerSync.java | 57 ++++++-- .../TestLeaderElectionWithEmptyReplica.java | 125 ++++++++++++++++++ 8 files changed, 211 insertions(+), 36 deletions(-) create mode 100644 solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionWithEmptyReplica.java diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 8f67431ff20..5a9cf5ae6ef 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -101,6 +101,9 @@ Bug Fixes * SOLR-9411: Better validation for Schema API add-field and add-dynamic-field (janhoy, Steve Rowe) +* SOLR-9504: A replica with an empty index becomes the leader even when other more qualified replicas + are in line. (shalin) + Optimizations ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index 16b9c6e8599..183f1774ee7 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -42,6 +42,7 @@ import org.apache.solr.core.CoreContainer; import org.apache.solr.core.SolrCore; import org.apache.solr.logging.MDCLoggingContext; import org.apache.solr.search.SolrIndexSearcher; +import org.apache.solr.update.PeerSync; import org.apache.solr.update.UpdateLog; import org.apache.solr.util.RefCounted; import org.apache.zookeeper.CreateMode; @@ -359,13 +360,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, e); } } - + + PeerSync.PeerSyncResult result = null; boolean success = false; try { - success = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement); + result = syncStrategy.sync(zkController, core, leaderProps, weAreReplacement); + success = result.isSuccess(); } catch (Exception e) { SolrException.log(log, "Exception while trying to sync", e); - success = false; + result = PeerSync.PeerSyncResult.failure(); } UpdateLog ulog = core.getUpdateHandler().getUpdateLog(); @@ -382,10 +385,15 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { if (!hasRecentUpdates) { // we failed sync, but we have no versions - we can't sync in that case // - we were active - // before, so become leader anyway - log.info( - "We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway"); - success = true; + // before, so become leader anyway if no one else has any versions either + if (result.getOtherHasVersions().orElse(false)) { + log.info("We failed sync, but we have no versions - we can't sync in that case. But others have some versions, so we should not become leader"); + success = false; + } else { + log.info( + "We failed sync, but we have no versions - we can't sync in that case - we were active before, so become leader anyway"); + success = true; + } } } diff --git a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java index f2333eb6ca3..90e515a9fde 100644 --- a/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/RecoveryStrategy.java @@ -373,7 +373,7 @@ public class RecoveryStrategy extends Thread implements Closeable { PeerSync peerSync = new PeerSync(core, Collections.singletonList(leaderUrl), ulog.getNumRecordsToKeep(), false, false); peerSync.setStartingVersions(recentVersions); - boolean syncSuccess = peerSync.sync(); + boolean syncSuccess = peerSync.sync().isSuccess(); if (syncSuccess) { SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams()); diff --git a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java index b1d69be9702..6356da77f00 100644 --- a/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java +++ b/solr/core/src/java/org/apache/solr/cloud/SyncStrategy.java @@ -77,23 +77,21 @@ public class SyncStrategy { public String baseUrl; } - public boolean sync(ZkController zkController, SolrCore core, ZkNodeProps leaderProps) { + public PeerSync.PeerSyncResult sync(ZkController zkController, SolrCore core, ZkNodeProps leaderProps) { return sync(zkController, core, leaderProps, false); } - public boolean sync(ZkController zkController, SolrCore core, ZkNodeProps leaderProps, + public PeerSync.PeerSyncResult sync(ZkController zkController, SolrCore core, ZkNodeProps leaderProps, boolean peerSyncOnlyWithActive) { if (SKIP_AUTO_RECOVERY) { - return true; + return PeerSync.PeerSyncResult.success(); } MDCLoggingContext.setCore(core); try { - boolean success; - if (isClosed) { log.warn("Closed, skipping sync up."); - return false; + return PeerSync.PeerSyncResult.failure(); } recoveryRequests.clear(); @@ -102,40 +100,40 @@ public class SyncStrategy { if (core.getUpdateHandler().getUpdateLog() == null) { log.error("No UpdateLog found - cannot sync"); - return false; + return PeerSync.PeerSyncResult.failure(); } - - success = syncReplicas(zkController, core, leaderProps, peerSyncOnlyWithActive); - - return success; + + return syncReplicas(zkController, core, leaderProps, peerSyncOnlyWithActive); } finally { MDCLoggingContext.clear(); } } - private boolean syncReplicas(ZkController zkController, SolrCore core, + private PeerSync.PeerSyncResult syncReplicas(ZkController zkController, SolrCore core, ZkNodeProps leaderProps, boolean peerSyncOnlyWithActive) { boolean success = false; + PeerSync.PeerSyncResult result = null; CloudDescriptor cloudDesc = core.getCoreDescriptor().getCloudDescriptor(); String collection = cloudDesc.getCollectionName(); String shardId = cloudDesc.getShardId(); if (isClosed) { log.info("We have been closed, won't sync with replicas"); - return false; + return PeerSync.PeerSyncResult.failure(); } // first sync ourselves - we are the potential leader after all try { - success = syncWithReplicas(zkController, core, leaderProps, collection, + result = syncWithReplicas(zkController, core, leaderProps, collection, shardId, peerSyncOnlyWithActive); + success = result.isSuccess(); } catch (Exception e) { SolrException.log(log, "Sync Failed", e); } try { if (isClosed) { log.info("We have been closed, won't attempt to sync replicas back to leader"); - return false; + return PeerSync.PeerSyncResult.failure(); } if (success) { @@ -152,17 +150,17 @@ public class SyncStrategy { SolrException.log(log, "Sync Failed", e); } - return success; + return result == null ? PeerSync.PeerSyncResult.failure() : result; } - private boolean syncWithReplicas(ZkController zkController, SolrCore core, + private PeerSync.PeerSyncResult syncWithReplicas(ZkController zkController, SolrCore core, ZkNodeProps props, String collection, String shardId, boolean peerSyncOnlyWithActive) { List nodes = zkController.getZkStateReader() .getReplicaProps(collection, shardId,core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName()); if (nodes == null) { // I have no replicas - return true; + return PeerSync.PeerSyncResult.success(); } List syncWith = new ArrayList<>(nodes.size()); diff --git a/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java b/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java index a40f4f0c1c0..584a7ca7cd1 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/RequestSyncShardOp.java @@ -65,7 +65,7 @@ class RequestSyncShardOp implements CoreAdminHandler.CoreAdminOp { props.put(ZkStateReader.CORE_NAME_PROP, cname); props.put(ZkStateReader.NODE_NAME_PROP, zkController.getNodeName()); - boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props), true); + boolean success = syncStrategy.sync(zkController, core, new ZkNodeProps(props), true).isSuccess(); // solrcloud_debug if (log.isDebugEnabled()) { try { diff --git a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java index 1a76f52399b..4ce4d789ff6 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java +++ b/solr/core/src/java/org/apache/solr/handler/component/RealTimeGetComponent.java @@ -675,7 +675,7 @@ public class RealTimeGetComponent extends SearchComponent boolean cantReachIsSuccess = rb.req.getParams().getBool("cantReachIsSuccess", false); PeerSync peerSync = new PeerSync(rb.req.getCore(), replicas, nVersions, cantReachIsSuccess, true); - boolean success = peerSync.sync(); + boolean success = peerSync.sync().isSuccess(); // TODO: more complex response? rb.rsp.add("sync", success); diff --git a/solr/core/src/java/org/apache/solr/update/PeerSync.java b/solr/core/src/java/org/apache/solr/update/PeerSync.java index dfacd4b89fb..1f61a568b2f 100644 --- a/solr/core/src/java/org/apache/solr/update/PeerSync.java +++ b/solr/core/src/java/org/apache/solr/update/PeerSync.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -172,12 +173,42 @@ public class PeerSync { return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" "; } + public static class PeerSyncResult { + private final boolean success; + private final Boolean otherHasVersions; + + public PeerSyncResult(boolean success, Boolean otherHasVersions) { + this.success = success; + this.otherHasVersions = otherHasVersions; + } + + public boolean isSuccess() { + return success; + } + + public Optional getOtherHasVersions() { + return Optional.ofNullable(otherHasVersions); + } + + public static PeerSyncResult success() { + return new PeerSyncResult(true, null); + } + + public static PeerSyncResult failure() { + return new PeerSyncResult(false, null); + } + + public static PeerSyncResult failure(boolean otherHasVersions) { + return new PeerSyncResult(false, otherHasVersions); + } + } + /** Returns true if peer sync was successful, meaning that this core may be considered to have the latest updates. * It does not mean that the remote replica is in sync with us. */ - public boolean sync() { + public PeerSyncResult sync() { if (ulog == null) { - return false; + return PeerSyncResult.failure(); } MDCLoggingContext.setCore(core); try { @@ -190,7 +221,7 @@ public class PeerSync { } // check if we already in sync to begin with if(doFingerprint && alreadyInSync()) { - return true; + return PeerSyncResult.success(); } @@ -211,7 +242,7 @@ public class PeerSync { if (startingVersions != null) { if (startingVersions.size() == 0) { log.warn("no frame of reference to tell if we've missed updates"); - return false; + return PeerSyncResult.failure(); } Collections.sort(startingVersions, absComparator); @@ -226,7 +257,7 @@ public class PeerSync { if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) { log.warn(msg() + "too many updates received since start - startingUpdates no longer overlaps with our currentUpdates"); - return false; + return PeerSyncResult.failure(); } // let's merge the lists @@ -248,7 +279,17 @@ public class PeerSync { // we have no versions and hence no frame of reference to tell if we can use a peers // updates to bring us into sync log.info(msg() + "DONE. We have no versions. sync failed."); - return false; + for (;;) { + ShardResponse srsp = shardHandler.takeCompletedOrError(); + if (srsp == null) break; + if (srsp.getException() == null) { + List otherVersions = (List)srsp.getSolrResponse().getResponse().get("versions"); + if (otherVersions != null && !otherVersions.isEmpty()) { + return PeerSyncResult.failure(true); + } + } + } + return PeerSyncResult.failure(false); } } @@ -263,7 +304,7 @@ public class PeerSync { if (!success) { log.info(msg() + "DONE. sync failed"); shardHandler.cancelAll(); - return false; + return PeerSyncResult.failure(); } } @@ -277,7 +318,7 @@ public class PeerSync { } log.info(msg() + "DONE. sync " + (success ? "succeeded" : "failed")); - return success; + return success ? PeerSyncResult.success() : PeerSyncResult.failure(); } finally { MDCLoggingContext.clear(); } diff --git a/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionWithEmptyReplica.java b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionWithEmptyReplica.java new file mode 100644 index 00000000000..84b39014c4d --- /dev/null +++ b/solr/core/src/test/org/apache/solr/cloud/TestLeaderElectionWithEmptyReplica.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.solr.cloud; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.embedded.JettySolrRunner; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.impl.HttpSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.cloud.DocCollection; +import org.apache.solr.common.cloud.Replica; +import org.apache.solr.common.cloud.Slice; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.apache.solr.common.cloud.ZkStateReader.BASE_URL_PROP; + +/** + * See SOLR-9504 + */ +public class TestLeaderElectionWithEmptyReplica extends SolrCloudTestCase { + private static final String COLLECTION_NAME = "solr_9504"; + + @BeforeClass + public static void beforeClass() throws Exception { + useFactory(null); + configureCluster(2) + .addConfig("config", TEST_PATH().resolve("configsets").resolve("cloud-minimal").resolve("conf")) + .configure(); + + CollectionAdminRequest.createCollection(COLLECTION_NAME, "config", 1, 1) + .processAndWait(cluster.getSolrClient(), DEFAULT_TIMEOUT); + + cluster.getSolrClient().waitForState(COLLECTION_NAME, DEFAULT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 1)); + } + + @Test + public void test() throws Exception { + CloudSolrClient solrClient = cluster.getSolrClient(); + solrClient.setDefaultCollection(COLLECTION_NAME); + for (int i=0; i<10; i++) { + SolrInputDocument doc = new SolrInputDocument(); + doc.addField("id", String.valueOf(i)); + solrClient.add(doc); + } + solrClient.commit(); + + // find the leader node + Replica replica = solrClient.getZkStateReader().getLeaderRetry(COLLECTION_NAME, "shard1"); + JettySolrRunner replicaJetty = null; + List jettySolrRunners = cluster.getJettySolrRunners(); + for (JettySolrRunner jettySolrRunner : jettySolrRunners) { + int port = jettySolrRunner.getBaseUrl().getPort(); + if (replica.getStr(BASE_URL_PROP).contains(":" + port)) { + replicaJetty = jettySolrRunner; + break; + } + } + + // kill the leader + ChaosMonkey.kill(replicaJetty); + + // add a replica (asynchronously) + CollectionAdminRequest.AddReplica addReplica = CollectionAdminRequest.addReplicaToShard(COLLECTION_NAME, "shard1"); + String asyncId = addReplica.processAsync(solrClient); + + // wait a bit + Thread.sleep(1000); + + // bring the old leader node back up + ChaosMonkey.start(replicaJetty); + + // wait until everyone is active + solrClient.waitForState(COLLECTION_NAME, DEFAULT_TIMEOUT, TimeUnit.SECONDS, + (n, c) -> DocCollection.isFullyActive(n, c, 1, 2)); + + // now query each replica and check for consistency + assertConsistentReplicas(solrClient, solrClient.getZkStateReader().getClusterState().getSlice(COLLECTION_NAME, "shard1")); + + // sanity check that documents still exist + QueryResponse response = solrClient.query(new SolrQuery("*:*")); + assertEquals("Indexed documents not found", 10, response.getResults().getNumFound()); + } + + private static int assertConsistentReplicas(CloudSolrClient cloudClient, Slice shard) throws SolrServerException, IOException { + long numFound = Long.MIN_VALUE; + int count = 0; + for (Replica replica : shard.getReplicas()) { + HttpSolrClient client = new HttpSolrClient.Builder(replica.getCoreUrl()) + .withHttpClient(cloudClient.getLbClient().getHttpClient()).build(); + QueryResponse response = client.query(new SolrQuery("q", "*:*", "distrib", "false")); +// log.info("Found numFound={} on replica: {}", response.getResults().getNumFound(), replica.getCoreUrl()); + if (numFound == Long.MIN_VALUE) { + numFound = response.getResults().getNumFound(); + } else { + assertEquals("Shard " + shard.getName() + " replicas do not have same number of documents", numFound, response.getResults().getNumFound()); + } + count++; + } + return count; + } +}