diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 0bd85b750c5..6a23bb66aa9 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -195,6 +195,8 @@ New Features the JSON request. (yonik) +* SOLR-7245: Temporary ZK election or connection loss should not stall indexing + due to leader initiated recovery (Ramkumar Aiyengar) Bug Fixes ---------------------- diff --git a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java index a5294192646..912116bc407 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java +++ b/solr/core/src/java/org/apache/solr/cloud/ElectionContext.java @@ -402,8 +402,9 @@ final class ShardLeaderElectionContext extends ShardLeaderElectionContextBase { 120, coreNodeName); zkController.ensureReplicaInLeaderInitiatedRecovery( - collection, shardId, coreNodeProps, false, coreNodeName); - + collection, shardId, coreNodeProps, coreNodeName, + false /* forcePublishState */, true /* retryOnConnLoss */); + ExecutorService executor = cc.getUpdateShardHandler().getUpdateExecutor(); executor.execute(lirThread); } diff --git a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java index 81598dd742d..c5197349632 100644 --- a/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java +++ b/solr/core/src/java/org/apache/solr/cloud/LeaderInitiatedRecoveryThread.java @@ -226,8 +226,10 @@ public class LeaderInitiatedRecoveryThread extends Thread { // so its state cannot be trusted and it needs to be told to recover again ... and we keep looping here log.warn("Replica core={} coreNodeName={} set to active but the leader thinks it should be in recovery;" + " forcing it back to down state to re-run the leader-initiated recovery process; props: "+replicaProps.get(0), coreNeedingRecovery, replicaCoreNodeName); - // force republish state to "down" - zkController.ensureReplicaInLeaderInitiatedRecovery(collection, shardId, nodeProps, true, leaderCoreNodeName); + zkController.ensureReplicaInLeaderInitiatedRecovery( + collection, shardId, nodeProps, leaderCoreNodeName, + true /* forcePublishState */, true /* retryOnConnLoss */ + ); } } break; diff --git a/solr/core/src/java/org/apache/solr/cloud/ZkController.java b/solr/core/src/java/org/apache/solr/cloud/ZkController.java index f484488d66c..aba7bc40731 100644 --- a/solr/core/src/java/org/apache/solr/cloud/ZkController.java +++ b/solr/core/src/java/org/apache/solr/cloud/ZkController.java @@ -1130,7 +1130,7 @@ public final class ZkController { if (ZkStateReader.ACTIVE.equals(state)) { // trying to become active, so leader-initiated state must be recovering if (ZkStateReader.RECOVERING.equals(lirState)) { - updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null); + updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.ACTIVE, null, true); } else if (ZkStateReader.DOWN.equals(lirState)) { throw new SolrException(ErrorCode.INVALID_STATE, "Cannot publish state of core '"+cd.getName()+"' as active without recovering first!"); @@ -1138,7 +1138,7 @@ public final class ZkController { } else if (ZkStateReader.RECOVERING.equals(state)) { // if it is currently DOWN, then trying to enter into recovering state is good if (ZkStateReader.DOWN.equals(lirState)) { - updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null); + updateLeaderInitiatedRecoveryState(collection, shardId, coreNodeName, ZkStateReader.RECOVERING, null, true); } } } @@ -1882,8 +1882,9 @@ public final class ZkController { * false means the node is not live either, so no point in trying to send recovery commands * to it. */ - public boolean ensureReplicaInLeaderInitiatedRecovery(final String collection, - final String shardId, final ZkCoreNodeProps replicaCoreProps, boolean forcePublishState, String leaderCoreNodeName) + public boolean ensureReplicaInLeaderInitiatedRecovery( + final String collection, final String shardId, final ZkCoreNodeProps replicaCoreProps, + String leaderCoreNodeName, boolean forcePublishState, boolean retryOnConnLoss) throws KeeperException, InterruptedException { final String replicaUrl = replicaCoreProps.getCoreUrl(); @@ -1893,10 +1894,10 @@ public final class ZkController { if (shardId == null) throw new IllegalArgumentException("shard parameter cannot be null for starting leader-initiated recovery for replica: "+replicaUrl); - + if (replicaUrl == null) throw new IllegalArgumentException("replicaUrl parameter cannot be null for starting leader-initiated recovery"); - + // First, determine if this replica is already in recovery handling // which is needed because there can be many concurrent errors flooding in // about the same replica having trouble and we only need to send the "needs" @@ -1918,7 +1919,7 @@ public final class ZkController { // we only really need to try to send the recovery command if the node itself is "live" if (getZkStateReader().getClusterState().liveNodesContain(replicaNodeName)) { // create a znode that requires the replica needs to "ack" to verify it knows it was out-of-sync - updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName); + updateLeaderInitiatedRecoveryState(collection, shardId, replicaCoreNodeName, ZkStateReader.DOWN, leaderCoreNodeName, retryOnConnLoss); replicasInLeaderInitiatedRecovery.put(replicaUrl, getLeaderInitiatedRecoveryZnodePath(collection, shardId, replicaCoreNodeName)); log.info("Put replica core={} coreNodeName={} on "+ @@ -2015,7 +2016,7 @@ public final class ZkController { } private void updateLeaderInitiatedRecoveryState(String collection, String shardId, String coreNodeName, String state, - String leaderCoreNodeName) { + String leaderCoreNodeName, boolean retryOnConnLoss) { if (collection == null || shardId == null || coreNodeName == null) { log.warn("Cannot set leader-initiated recovery state znode to "+state+" using: collection="+collection+ "; shardId="+shardId+"; coreNodeName="+coreNodeName); diff --git a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java index 75955bde8d4..3c201ad9152 100644 --- a/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java +++ b/solr/core/src/java/org/apache/solr/update/processor/DistributedUpdateProcessor.java @@ -52,7 +52,6 @@ import org.apache.solr.core.CoreContainer; import org.apache.solr.core.CoreDescriptor; import org.apache.solr.handler.component.RealTimeGetComponent; import org.apache.solr.request.SolrQueryRequest; -import org.apache.solr.request.SolrRequestInfo; import org.apache.solr.response.SolrQueryResponse; import org.apache.solr.schema.SchemaField; import org.apache.solr.update.AddUpdateCommand; @@ -147,7 +146,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { this.nodeErrorTracker = new HashMap<>(5); this.otherLeaderRf = new HashMap<>(); } - + // gives the replication factor that was achieved for this request public int getAchievedRf() { // look across all shards to find the minimum achieved replication @@ -286,7 +285,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { returnVersions = req.getParams().getBool(UpdateParams.VERSIONS ,false); // TODO: better way to get the response, or pass back info to it? - SolrRequestInfo reqInfo = returnVersions ? SolrRequestInfo.getRequestInfo() : null; + // SolrRequestInfo reqInfo = returnVersions ? SolrRequestInfo.getRequestInfo() : null; this.req = req; @@ -847,11 +846,19 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { // before we go setting other replicas to down, make sure we're still the leader! String leaderCoreNodeName = null; + Exception getLeaderExc = null; try { - leaderCoreNodeName = zkController.getZkStateReader().getLeaderRetry(collection, shardId).getName(); + Replica leader = zkController.getZkStateReader().getLeader(collection, shardId); + if (leader != null) { + leaderCoreNodeName = leader.getName(); + } } catch (Exception exc) { - log.error("Failed to determine if " + cloudDesc.getCoreNodeName() + " is still the leader for " + collection + - " " + shardId + " before putting " + replicaUrl + " into leader-initiated recovery due to: " + exc); + getLeaderExc = exc; + } + if (leaderCoreNodeName == null) { + log.warn("Failed to determine if {} is still the leader for collection={} shardId={} " + + "before putting {} into leader-initiated recovery", + cloudDesc.getCoreNodeName(), collection, shardId, replicaUrl, getLeaderExc); } List myReplicas = zkController.getZkStateReader().getReplicaProps(collection, @@ -873,8 +880,10 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { zkController.ensureReplicaInLeaderInitiatedRecovery(collection, shardId, stdNode.getNodeProps(), - false, - leaderCoreNodeName); + leaderCoreNodeName, + false /* forcePublishState */, + false /* retryOnConnLoss */ + ); // we want to try more than once, ~10 minutes if (sendRecoveryCommand) { @@ -909,7 +918,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { continue; // the replica is already in recovery handling or is not live Throwable rootCause = SolrException.getRootCause(error.e); - log.error("Setting up to try to start recovery on replica " + replicaUrl + " after: " + rootCause); + log.error("Setting up to try to start recovery on replica {}", replicaUrl, rootCause); // try to send the recovery command to the downed replica in a background thread CoreContainer coreContainer = req.getCore().getCoreDescriptor().getCoreContainer(); @@ -1591,7 +1600,7 @@ public class DistributedUpdateProcessor extends UpdateRequestProcessor { if (!zkEnabled || req.getParams().getBool(COMMIT_END_POINT, false) || singleLeader) { doLocalCommit(cmd); - } else if (zkEnabled) { + } else { ModifiableSolrParams params = new ModifiableSolrParams(filterParams(req.getParams())); if (!req.getParams().getBool(COMMIT_END_POINT, false)) { params.set(COMMIT_END_POINT, true); diff --git a/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java b/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java index fb3db969db2..9eb14ef7d60 100644 --- a/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/BasicZkTest.java @@ -25,7 +25,6 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.core.SolrCore; import org.apache.solr.request.LocalSolrQueryRequest; import org.apache.solr.request.SolrQueryRequest; -import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -106,7 +105,10 @@ public class BasicZkTest extends AbstractZkTestCase { int zkPort = zkServer.getPort(); zkServer.shutdown(); - + + // document indexing shouldn't stop immediately after a ZK disconnect + assertU(adoc("id", "201")); + Thread.sleep(300); // try a reconnect from disconnect @@ -174,9 +176,4 @@ public class BasicZkTest extends AbstractZkTestCase { req.setParams(params); return req; } - - @AfterClass - public static void afterClass() { - - } } diff --git a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java index 6ec6cd90081..0a82e79c02b 100644 --- a/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/HttpPartitionTest.java @@ -17,7 +17,6 @@ package org.apache.solr.cloud; * limitations under the License. */ -import org.apache.http.NoHttpResponseException; import org.apache.lucene.util.LuceneTestCase.Slow; import org.apache.solr.JSONTestUtil; import org.apache.solr.SolrTestCaseJ4.SuppressSSL; @@ -26,7 +25,6 @@ import org.apache.solr.client.solrj.embedded.JettySolrRunner; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.QueryRequest; -import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.CollectionAdminResponse; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; @@ -146,7 +144,7 @@ public class HttpPartitionTest extends AbstractFullDistribZkTestBase { String replicaUrl = replicaCoreNodeProps.getCoreUrl(); assertTrue(!zkController.isReplicaInRecoveryHandling(replicaUrl)); - assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, false, leader.getName())); + assertTrue(zkController.ensureReplicaInLeaderInitiatedRecovery(testCollectionName, shardId, replicaCoreNodeProps, leader.getName(), false, true)); assertTrue(zkController.isReplicaInRecoveryHandling(replicaUrl)); Map lirStateMap = zkController.getLeaderInitiatedRecoveryStateObject(testCollectionName, shardId, notLeader.getName()); assertNotNull(lirStateMap); diff --git a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java index 0302572e03b..f43bee8e560 100644 --- a/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java +++ b/solr/core/src/test/org/apache/solr/cloud/ZkControllerTest.java @@ -278,7 +278,7 @@ public class ZkControllerTest extends SolrTestCaseJ4 { try { // this method doesn't throw exception when node isn't leader zkController.ensureReplicaInLeaderInitiatedRecovery("c1", "shard1", - new ZkCoreNodeProps(replica), false, "non_existent_leader"); + new ZkCoreNodeProps(replica), "non_existent_leader", false, false); fail("ZkController should not write LIR state for node which is not leader"); } catch (Exception e) { assertNull("ZkController should not write LIR state for node which is not leader", diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java index 5d3d9a156b0..89d6e1513a1 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ZkStateReader.java @@ -102,6 +102,8 @@ public class ZkStateReader implements Closeable { protected volatile ClusterState clusterState; private static final long SOLRCLOUD_UPDATE_DELAY = Long.parseLong(System.getProperty("solrcloud.update.delay", "5000")); + private static final int GET_LEADER_RETRY_INTERVAL_MS = 50; + private static final int GET_LEADER_RETRY_DEFAULT_TIMEOUT = 4000; public static final String LEADER_ELECT_ZKNODE = "leader_elect"; @@ -642,12 +644,22 @@ public class ZkStateReader implements Closeable { shard, timeout)); return props.getCoreUrl(); } - + + public Replica getLeader(String collection, String shard) throws InterruptedException { + if (clusterState != null) { + Replica replica = clusterState.getLeader(collection, shard); + if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) { + return replica; + } + } + return null; + } + /** * Get shard leader properties, with retry if none exist. */ public Replica getLeaderRetry(String collection, String shard) throws InterruptedException { - return getLeaderRetry(collection, shard, 4000); + return getLeaderRetry(collection, shard, GET_LEADER_RETRY_DEFAULT_TIMEOUT); } /** @@ -655,14 +667,11 @@ public class ZkStateReader implements Closeable { */ public Replica getLeaderRetry(String collection, String shard, int timeout) throws InterruptedException { long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS); - while (System.nanoTime() < timeoutAt && !closed) { - if (clusterState != null) { - Replica replica = clusterState.getLeader(collection, shard); - if (replica != null && getClusterState().liveNodesContain(replica.getNodeName())) { - return replica; - } - } - Thread.sleep(50); + while (true) { + Replica leader = getLeader(collection, shard); + if (leader != null) return leader; + if (System.nanoTime() >= timeoutAt || closed) break; + Thread.sleep(GET_LEADER_RETRY_INTERVAL_MS); } throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE, "No registered leader was found after waiting for " + timeout + "ms " + ", collection: " + collection + " slice: " + shard);